Skip to main content

rivven_operator/
crd.rs

1//! Custom Resource Definitions for Rivven Kubernetes Operator
2//!
3//! This module defines the `RivvenCluster` CRD that represents a Rivven
4//! distributed streaming cluster in Kubernetes.
5
6use k8s_openapi::api::core::v1::ResourceRequirements;
7use kube::CustomResource;
8use regex::Regex;
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11use std::collections::BTreeMap;
12use std::sync::LazyLock;
13use validator::{Validate, ValidationError};
14
15/// Regex for validating Kubernetes resource quantities (e.g., "10Gi", "100Mi")
16static QUANTITY_REGEX: LazyLock<Regex> =
17    LazyLock::new(|| Regex::new(r"^[0-9]+(\.[0-9]+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$").unwrap());
18
19/// Regex for validating Kubernetes names (RFC 1123 subdomain)
20static NAME_REGEX: LazyLock<Regex> =
21    LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
22
23/// Validate a Kubernetes resource quantity string
24fn validate_quantity(value: &str) -> Result<(), ValidationError> {
25    if QUANTITY_REGEX.is_match(value) {
26        Ok(())
27    } else {
28        Err(ValidationError::new("invalid_quantity")
29            .with_message(format!("'{}' is not a valid Kubernetes quantity", value).into()))
30    }
31}
32
33/// Validate a container image reference
34fn validate_image(value: &str) -> Result<(), ValidationError> {
35    if value.is_empty() {
36        return Ok(()); // Empty is allowed (uses default)
37    }
38    if value.len() > 255 {
39        return Err(ValidationError::new("image_too_long")
40            .with_message("image reference exceeds 255 characters".into()));
41    }
42    // Basic format check - not overly strict to allow various registries
43    if value.contains("..") || value.starts_with('/') || value.starts_with('-') {
44        return Err(ValidationError::new("invalid_image")
45            .with_message(format!("'{}' is not a valid container image", value).into()));
46    }
47    Ok(())
48}
49
50/// Validate a Kubernetes name (RFC 1123 subdomain)
51fn validate_k8s_name(value: &str) -> Result<(), ValidationError> {
52    if value.is_empty() {
53        return Ok(()); // Empty is allowed for optional fields
54    }
55    if value.len() > 63 {
56        return Err(
57            ValidationError::new("name_too_long").with_message("name exceeds 63 characters".into())
58        );
59    }
60    if !NAME_REGEX.is_match(value) {
61        return Err(ValidationError::new("invalid_name").with_message(
62            format!("'{}' is not a valid Kubernetes name (RFC 1123)", value).into(),
63        ));
64    }
65    Ok(())
66}
67
68/// Regex for validating POSIX environment variable names:
69/// must start with a letter or underscore, followed by letters, digits, or underscores.
70static ENV_NAME_REGEX: LazyLock<Regex> =
71    LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
72
73/// Characters that are dangerous in shell contexts and must not appear in env var values.
74static SHELL_METACHAR_REGEX: LazyLock<Regex> =
75    LazyLock::new(|| Regex::new(r"[`$;|&<>()\\!]").unwrap());
76
77/// Maximum length for an environment variable value (32 KiB).
78const MAX_ENV_VALUE_LEN: usize = 32_768;
79
80/// Validate environment variable names and values.
81///
82/// Enforces:
83/// - Maximum 100 env vars
84/// - POSIX-compliant names (letter/underscore prefix, alphanumeric/underscore body)
85/// - Name length 1–256 chars
86/// - Value length ≤ 32 KiB
87/// - No shell metacharacters in values (backtick, `$`, `;`, `|`, `&`, `<`, `>`, `(`, `)`, `\`, `!`)
88/// - Blocklist of security-sensitive variable names
89fn validate_env_vars(vars: &[k8s_openapi::api::core::v1::EnvVar]) -> Result<(), ValidationError> {
90    // Limit number of env vars to prevent resource exhaustion
91    const MAX_ENV_VARS: usize = 100;
92    if vars.len() > MAX_ENV_VARS {
93        return Err(ValidationError::new("too_many_env_vars").with_message(
94            format!("maximum {} environment variables allowed", MAX_ENV_VARS).into(),
95        ));
96    }
97    for var in vars {
98        // Validate env var name length
99        if var.name.is_empty() || var.name.len() > 256 {
100            return Err(ValidationError::new("invalid_env_name")
101                .with_message("environment variable name must be 1-256 characters".into()));
102        }
103
104        // Validate env var name is POSIX-compliant
105        if !ENV_NAME_REGEX.is_match(&var.name) {
106            return Err(
107                ValidationError::new("invalid_env_name_format").with_message(
108                    format!(
109                        "environment variable name '{}' is not POSIX-compliant \
110                     (must match [a-zA-Z_][a-zA-Z0-9_]*)",
111                        var.name
112                    )
113                    .into(),
114                ),
115            );
116        }
117
118        // Validate env var value if present
119        if let Some(ref value) = var.value {
120            if value.len() > MAX_ENV_VALUE_LEN {
121                return Err(ValidationError::new("env_value_too_long").with_message(
122                    format!(
123                        "environment variable '{}' value exceeds {} byte limit",
124                        var.name, MAX_ENV_VALUE_LEN
125                    )
126                    .into(),
127                ));
128            }
129            if SHELL_METACHAR_REGEX.is_match(value) {
130                return Err(
131                    ValidationError::new("env_value_shell_metachar").with_message(
132                        format!(
133                            "environment variable '{}' value contains shell metacharacters \
134                         (backtick, $, ;, |, &, <, >, (, ), \\, ! are not allowed)",
135                            var.name
136                        )
137                        .into(),
138                    ),
139                );
140            }
141        }
142
143        // Check for dangerous env var names that could override security settings
144        let forbidden_names = [
145            "LD_PRELOAD",
146            "LD_LIBRARY_PATH",
147            "DYLD_INSERT_LIBRARIES",
148            "DYLD_LIBRARY_PATH",
149        ];
150        let forbidden_prefixes = ["LD_AUDIT"];
151        // Check both `value` and `value_from` references.
152        let has_value = var.value.is_some() || var.value_from.is_some();
153        for name in forbidden_names {
154            if var.name == name && has_value {
155                return Err(ValidationError::new("forbidden_env_var").with_message(
156                    format!(
157                        "environment variable '{}' is not allowed for security \
158                         (blocked for both direct values and value_from references)",
159                        var.name
160                    )
161                    .into(),
162                ));
163            }
164        }
165        for prefix in forbidden_prefixes {
166            if var.name.starts_with(prefix) && has_value {
167                return Err(ValidationError::new("forbidden_env_var").with_message(
168                    format!(
169                        "environment variable '{}' is not allowed for security \
170                         (blocked for both direct values and value_from references)",
171                        var.name
172                    )
173                    .into(),
174                ));
175            }
176        }
177    }
178    Ok(())
179}
180
181/// RivvenCluster custom resource definition
182///
183/// Represents a Rivven distributed event streaming cluster deployment.
184/// The operator watches these resources and reconciles the actual cluster
185/// state to match the desired specification.
186#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
187#[kube(
188    group = "rivven.hupe1980.github.io",
189    version = "v1alpha1",
190    kind = "RivvenCluster",
191    plural = "rivvenclusters",
192    shortname = "rc",
193    namespaced,
194    status = "RivvenClusterStatus",
195    printcolumn = r#"{"name":"Replicas", "type":"integer", "jsonPath":".spec.replicas"}"#,
196    printcolumn = r#"{"name":"Ready", "type":"integer", "jsonPath":".status.readyReplicas"}"#,
197    printcolumn = r#"{"name":"Phase", "type":"string", "jsonPath":".status.phase"}"#,
198    printcolumn = r#"{"name":"Age", "type":"date", "jsonPath":".metadata.creationTimestamp"}"#
199)]
200#[serde(rename_all = "camelCase")]
201pub struct RivvenClusterSpec {
202    /// Number of broker replicas (1-100)
203    #[serde(default = "default_replicas")]
204    #[validate(range(min = 1, max = 100, message = "replicas must be between 1 and 100"))]
205    pub replicas: i32,
206
207    /// Rivven version to deploy (must match semver pattern)
208    #[serde(default = "default_version")]
209    #[validate(length(min = 1, max = 64, message = "version must be 1-64 characters"))]
210    pub version: String,
211
212    /// Container image (overrides version-based default)
213    /// Must be a valid container image reference
214    #[serde(default)]
215    #[validate(custom(function = "validate_optional_image"))]
216    pub image: Option<String>,
217
218    /// Image pull policy (Always, IfNotPresent, Never)
219    #[serde(default = "default_image_pull_policy")]
220    #[validate(custom(function = "validate_pull_policy"))]
221    pub image_pull_policy: String,
222
223    /// Image pull secrets (max 10 secrets)
224    #[serde(default)]
225    #[validate(length(max = 10, message = "maximum 10 image pull secrets allowed"))]
226    pub image_pull_secrets: Vec<String>,
227
228    /// Storage configuration
229    #[serde(default)]
230    #[validate(nested)]
231    pub storage: StorageSpec,
232
233    /// Resource requirements (CPU, memory)
234    #[serde(default)]
235    #[schemars(skip)]
236    pub resources: Option<ResourceRequirements>,
237
238    /// Broker configuration parameters
239    #[serde(default)]
240    #[validate(nested)]
241    pub config: BrokerConfig,
242
243    /// TLS configuration
244    #[serde(default)]
245    #[validate(nested)]
246    pub tls: TlsSpec,
247
248    /// Metrics configuration
249    #[serde(default)]
250    #[validate(nested)]
251    pub metrics: MetricsSpec,
252
253    /// Pod affinity/anti-affinity rules
254    #[serde(default)]
255    #[schemars(skip)]
256    pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
257
258    /// Node selector for pod scheduling (max 20 selectors)
259    #[serde(default)]
260    #[validate(custom(function = "validate_node_selector"))]
261    pub node_selector: BTreeMap<String, String>,
262
263    /// Tolerations for pod scheduling
264    #[serde(default)]
265    #[schemars(skip)]
266    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
267
268    /// Pod disruption budget configuration
269    #[serde(default)]
270    #[validate(nested)]
271    pub pod_disruption_budget: PdbSpec,
272
273    /// Service account name (must be valid K8s name)
274    #[serde(default)]
275    #[validate(custom(function = "validate_optional_k8s_name"))]
276    pub service_account: Option<String>,
277
278    /// Additional pod annotations (max 50)
279    #[serde(default)]
280    #[validate(custom(function = "validate_annotations"))]
281    pub pod_annotations: BTreeMap<String, String>,
282
283    /// Additional pod labels (max 20)
284    #[serde(default)]
285    #[validate(custom(function = "validate_labels"))]
286    pub pod_labels: BTreeMap<String, String>,
287
288    /// Environment variables (validated for security)
289    #[serde(default)]
290    #[schemars(skip)]
291    #[validate(custom(function = "validate_env_vars"))]
292    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
293
294    /// Liveness probe configuration
295    #[serde(default)]
296    #[validate(nested)]
297    pub liveness_probe: ProbeSpec,
298
299    /// Readiness probe configuration
300    #[serde(default)]
301    #[validate(nested)]
302    pub readiness_probe: ProbeSpec,
303
304    /// Security context for pods
305    #[serde(default)]
306    #[schemars(skip)]
307    pub security_context: Option<k8s_openapi::api::core::v1::PodSecurityContext>,
308
309    /// Container security context
310    #[serde(default)]
311    #[schemars(skip)]
312    pub container_security_context: Option<k8s_openapi::api::core::v1::SecurityContext>,
313}
314
315/// Validate optional image reference
316fn validate_optional_image(image: &str) -> Result<(), ValidationError> {
317    validate_image(image)
318}
319
320/// Validate image pull policy
321fn validate_pull_policy(policy: &str) -> Result<(), ValidationError> {
322    match policy {
323        "Always" | "IfNotPresent" | "Never" => Ok(()),
324        _ => Err(ValidationError::new("invalid_pull_policy")
325            .with_message("imagePullPolicy must be Always, IfNotPresent, or Never".into())),
326    }
327}
328
329/// Validate node selector map
330fn validate_node_selector(selectors: &BTreeMap<String, String>) -> Result<(), ValidationError> {
331    if selectors.len() > 20 {
332        return Err(ValidationError::new("too_many_selectors")
333            .with_message("maximum 20 node selectors allowed".into()));
334    }
335    for (key, value) in selectors {
336        if key.len() > 253 || value.len() > 63 {
337            return Err(ValidationError::new("selector_too_long")
338                .with_message("selector key max 253 chars, value max 63 chars".into()));
339        }
340    }
341    Ok(())
342}
343
344/// Validate optional Kubernetes name (for use with Option<String> fields)
345fn validate_optional_k8s_name(name: &str) -> Result<(), ValidationError> {
346    if name.is_empty() {
347        return Ok(()); // Empty is allowed for optional fields
348    }
349    validate_k8s_name(name)
350}
351
352/// Validate annotations map
353fn validate_annotations(annotations: &BTreeMap<String, String>) -> Result<(), ValidationError> {
354    if annotations.len() > 50 {
355        return Err(ValidationError::new("too_many_annotations")
356            .with_message("maximum 50 annotations allowed".into()));
357    }
358    for (key, value) in annotations {
359        // Annotation keys can be up to 253 chars (with optional prefix)
360        if key.len() > 253 {
361            return Err(ValidationError::new("annotation_key_too_long")
362                .with_message(format!("annotation key '{}' exceeds 253 characters", key).into()));
363        }
364        // Annotation values can be up to 256KB
365        if value.len() > 262144 {
366            return Err(ValidationError::new("annotation_value_too_long")
367                .with_message(format!("annotation '{}' value exceeds 256KB", key).into()));
368        }
369    }
370    Ok(())
371}
372
373/// Validate labels map
374fn validate_labels(labels: &BTreeMap<String, String>) -> Result<(), ValidationError> {
375    if labels.len() > 20 {
376        return Err(ValidationError::new("too_many_labels")
377            .with_message("maximum 20 labels allowed".into()));
378    }
379    for (key, value) in labels {
380        if key.len() > 253 || value.len() > 63 {
381            return Err(ValidationError::new("label_too_long")
382                .with_message("label key max 253 chars, value max 63 chars".into()));
383        }
384        // Labels must not override managed labels
385        if key.starts_with("app.kubernetes.io/") {
386            return Err(ValidationError::new("reserved_label").with_message(
387                format!("label '{}' uses reserved prefix app.kubernetes.io/", key).into(),
388            ));
389        }
390    }
391    Ok(())
392}
393
394/// Storage specification for broker data
395#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
396#[serde(rename_all = "camelCase")]
397pub struct StorageSpec {
398    /// Storage size (e.g., "100Gi") - must be valid Kubernetes quantity
399    #[serde(default = "default_storage_size")]
400    #[validate(custom(function = "validate_quantity"))]
401    pub size: String,
402
403    /// Storage class name (empty uses default)
404    #[serde(default)]
405    #[validate(custom(function = "validate_optional_k8s_name"))]
406    pub storage_class_name: Option<String>,
407
408    /// Access modes for the PVC
409    #[serde(default = "default_access_modes")]
410    #[validate(length(min = 1, max = 3, message = "access modes must have 1-3 entries"))]
411    #[validate(custom(function = "validate_access_modes"))]
412    pub access_modes: Vec<String>,
413}
414
415/// Validate PVC access modes
416fn validate_access_modes(modes: &[String]) -> Result<(), ValidationError> {
417    let valid_modes = [
418        "ReadWriteOnce",
419        "ReadOnlyMany",
420        "ReadWriteMany",
421        "ReadWriteOncePod",
422    ];
423    for mode in modes {
424        if !valid_modes.contains(&mode.as_str()) {
425            return Err(ValidationError::new("invalid_access_mode")
426                .with_message(format!("'{}' is not a valid access mode", mode).into()));
427        }
428    }
429    Ok(())
430}
431
432impl Default for StorageSpec {
433    fn default() -> Self {
434        Self {
435            size: default_storage_size(),
436            storage_class_name: None,
437            access_modes: default_access_modes(),
438        }
439    }
440}
441
442/// Broker configuration parameters
443#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
444#[serde(rename_all = "camelCase")]
445pub struct BrokerConfig {
446    /// Default number of partitions for new topics (1-1000)
447    #[serde(default = "default_partitions")]
448    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
449    pub default_partitions: i32,
450
451    /// Default replication factor for new topics (1-10)
452    #[serde(default = "default_replication_factor")]
453    #[validate(range(
454        min = 1,
455        max = 10,
456        message = "replication factor must be between 1 and 10"
457    ))]
458    pub default_replication_factor: i32,
459
460    /// Log retention period in hours (1-8760, i.e., 1 hour to 1 year)
461    #[serde(default = "default_log_retention_hours")]
462    #[validate(range(
463        min = 1,
464        max = 8760,
465        message = "retention hours must be between 1 and 8760"
466    ))]
467    pub log_retention_hours: i32,
468
469    /// Log segment size in bytes (1MB to 10GB)
470    #[serde(default = "default_log_segment_bytes")]
471    #[validate(custom(function = "validate_segment_size"))]
472    pub log_segment_bytes: i64,
473
474    /// Maximum message size in bytes (1KB to 100MB)
475    #[serde(default = "default_max_message_bytes")]
476    #[validate(custom(function = "validate_message_size"))]
477    pub max_message_bytes: i64,
478
479    /// Enable auto topic creation
480    #[serde(default = "default_true")]
481    pub auto_create_topics: bool,
482
483    /// Enable compression
484    #[serde(default = "default_true")]
485    pub compression_enabled: bool,
486
487    /// Compression algorithm (lz4, zstd, none)
488    #[serde(default = "default_compression")]
489    #[validate(custom(function = "validate_compression_type"))]
490    pub compression_type: String,
491
492    /// Raft election timeout in milliseconds (100-60000)
493    #[serde(default = "default_election_timeout")]
494    #[validate(range(
495        min = 100,
496        max = 60000,
497        message = "election timeout must be between 100ms and 60s"
498    ))]
499    pub raft_election_timeout_ms: i32,
500
501    /// Raft heartbeat interval in milliseconds (10-10000)
502    #[serde(default = "default_heartbeat_interval")]
503    #[validate(range(
504        min = 10,
505        max = 10000,
506        message = "heartbeat interval must be between 10ms and 10s"
507    ))]
508    pub raft_heartbeat_interval_ms: i32,
509
510    /// Additional raw configuration overrides (max 50 entries)
511    #[serde(default)]
512    #[validate(custom(function = "validate_raw_config"))]
513    pub raw: BTreeMap<String, String>,
514}
515
516/// Validate compression type
517fn validate_compression_type(compression: &str) -> Result<(), ValidationError> {
518    match compression {
519        "lz4" | "zstd" | "none" | "snappy" | "gzip" => Ok(()),
520        _ => Err(ValidationError::new("invalid_compression")
521            .with_message("compression must be one of: lz4, zstd, none, snappy, gzip".into())),
522    }
523}
524
525/// Validate log segment size (1MB to 10GB)
526fn validate_segment_size(size: i64) -> Result<(), ValidationError> {
527    const MIN_SEGMENT_SIZE: i64 = 1_048_576; // 1MB
528    const MAX_SEGMENT_SIZE: i64 = 10_737_418_240; // 10GB
529    if !(MIN_SEGMENT_SIZE..=MAX_SEGMENT_SIZE).contains(&size) {
530        return Err(ValidationError::new("invalid_segment_size")
531            .with_message("segment size must be between 1MB and 10GB".into()));
532    }
533    Ok(())
534}
535
536/// Validate max message size (1KB to 100MB)
537fn validate_message_size(size: i64) -> Result<(), ValidationError> {
538    const MIN_MESSAGE_SIZE: i64 = 1_024; // 1KB
539    const MAX_MESSAGE_SIZE: i64 = 104_857_600; // 100MB
540    if !(MIN_MESSAGE_SIZE..=MAX_MESSAGE_SIZE).contains(&size) {
541        return Err(ValidationError::new("invalid_message_size")
542            .with_message("max message size must be between 1KB and 100MB".into()));
543    }
544    Ok(())
545}
546
547/// Validate raw config map
548fn validate_raw_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
549    if config.len() > 50 {
550        return Err(ValidationError::new("too_many_raw_configs")
551            .with_message("maximum 50 raw configuration entries allowed".into()));
552    }
553    for (key, value) in config {
554        if key.len() > 128 || value.len() > 4096 {
555            return Err(ValidationError::new("raw_config_too_long")
556                .with_message("raw config key max 128 chars, value max 4096 chars".into()));
557        }
558        // Prevent injection of dangerous config keys
559        let forbidden_keys = ["command", "args", "image", "securityContext", "volumes"];
560        if forbidden_keys.contains(&key.as_str()) {
561            return Err(ValidationError::new("forbidden_raw_config")
562                .with_message(format!("raw config key '{}' is not allowed", key).into()));
563        }
564    }
565    Ok(())
566}
567
568impl Default for BrokerConfig {
569    fn default() -> Self {
570        Self {
571            default_partitions: default_partitions(),
572            default_replication_factor: default_replication_factor(),
573            log_retention_hours: default_log_retention_hours(),
574            log_segment_bytes: default_log_segment_bytes(),
575            max_message_bytes: default_max_message_bytes(),
576            auto_create_topics: default_true(),
577            compression_enabled: default_true(),
578            compression_type: default_compression(),
579            raft_election_timeout_ms: default_election_timeout(),
580            raft_heartbeat_interval_ms: default_heartbeat_interval(),
581            raw: BTreeMap::new(),
582        }
583    }
584}
585
586/// TLS configuration
587#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
588#[serde(rename_all = "camelCase")]
589#[validate(schema(function = "validate_tls_spec"))]
590pub struct TlsSpec {
591    /// Enable TLS
592    #[serde(default)]
593    pub enabled: bool,
594
595    /// Name of the secret containing TLS certificates (required if enabled)
596    #[serde(default)]
597    #[validate(custom(function = "validate_optional_k8s_name"))]
598    pub cert_secret_name: Option<String>,
599
600    /// Enable mTLS (mutual TLS)
601    #[serde(default)]
602    pub mtls_enabled: bool,
603
604    /// Name of the secret containing CA certificate for mTLS
605    #[serde(default)]
606    #[validate(custom(function = "validate_optional_k8s_name"))]
607    pub ca_secret_name: Option<String>,
608}
609
610/// Structural validation for [`TlsSpec`].
611///
612/// Ensures logical consistency: `cert_secret_name` must be set when TLS is
613/// enabled, `ca_secret_name` must be set when mTLS is enabled, and mTLS
614/// cannot be enabled without TLS itself being enabled.
615fn validate_tls_spec(spec: &TlsSpec) -> Result<(), validator::ValidationError> {
616    if spec.enabled && spec.cert_secret_name.is_none() {
617        return Err(validator::ValidationError::new("tls_cert_required"));
618    }
619    if spec.mtls_enabled && !spec.enabled {
620        return Err(validator::ValidationError::new("mtls_requires_tls"));
621    }
622    if spec.mtls_enabled && spec.ca_secret_name.is_none() {
623        return Err(validator::ValidationError::new("mtls_ca_required"));
624    }
625    Ok(())
626}
627
628/// Metrics configuration
629#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
630#[serde(rename_all = "camelCase")]
631pub struct MetricsSpec {
632    /// Enable Prometheus metrics
633    #[serde(default = "default_true")]
634    pub enabled: bool,
635
636    /// Metrics port (1024-65535, must be unprivileged)
637    #[serde(default = "default_metrics_port")]
638    #[validate(range(
639        min = 1024,
640        max = 65535,
641        message = "metrics port must be between 1024 and 65535"
642    ))]
643    pub port: i32,
644
645    /// ServiceMonitor configuration for Prometheus Operator
646    #[serde(default)]
647    #[validate(nested)]
648    pub service_monitor: ServiceMonitorSpec,
649}
650
651impl Default for MetricsSpec {
652    fn default() -> Self {
653        Self {
654            enabled: true,
655            port: 9090,
656            service_monitor: ServiceMonitorSpec::default(),
657        }
658    }
659}
660
661/// ServiceMonitor configuration for Prometheus Operator integration
662#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
663#[serde(rename_all = "camelCase")]
664pub struct ServiceMonitorSpec {
665    /// Create a ServiceMonitor resource
666    #[serde(default)]
667    pub enabled: bool,
668
669    /// Namespace for the ServiceMonitor (defaults to cluster namespace)
670    #[serde(default)]
671    #[validate(custom(function = "validate_optional_k8s_name"))]
672    pub namespace: Option<String>,
673
674    /// Scrape interval (must match Prometheus duration format)
675    #[serde(default = "default_scrape_interval")]
676    #[validate(custom(function = "validate_duration"))]
677    pub interval: String,
678
679    /// Additional labels for the ServiceMonitor (max 10)
680    #[serde(default)]
681    #[validate(custom(function = "validate_service_monitor_labels"))]
682    pub labels: BTreeMap<String, String>,
683}
684
685/// Validate Prometheus duration format (e.g., "30s", "1m", "5m30s")
686fn validate_duration(duration: &str) -> Result<(), ValidationError> {
687    static DURATION_REGEX: LazyLock<Regex> =
688        LazyLock::new(|| Regex::new(r"^([0-9]+[smh])+$").unwrap());
689    if !DURATION_REGEX.is_match(duration) {
690        return Err(ValidationError::new("invalid_duration").with_message(
691            format!("'{}' is not a valid duration (e.g., 30s, 1m)", duration).into(),
692        ));
693    }
694    Ok(())
695}
696
697/// Validate ServiceMonitor labels
698fn validate_service_monitor_labels(
699    labels: &BTreeMap<String, String>,
700) -> Result<(), ValidationError> {
701    if labels.len() > 10 {
702        return Err(ValidationError::new("too_many_labels")
703            .with_message("maximum 10 ServiceMonitor labels allowed".into()));
704    }
705    Ok(())
706}
707
708/// Pod Disruption Budget configuration
709#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
710#[serde(rename_all = "camelCase")]
711pub struct PdbSpec {
712    /// Enable PDB creation
713    #[serde(default = "default_true")]
714    pub enabled: bool,
715
716    /// Minimum available pods (mutually exclusive with maxUnavailable)
717    /// Can be an integer or percentage (e.g., "50%")
718    #[serde(default)]
719    #[validate(custom(function = "validate_optional_int_or_percent"))]
720    pub min_available: Option<String>,
721
722    /// Maximum unavailable pods
723    /// Can be an integer or percentage (e.g., "25%")
724    #[serde(default = "default_max_unavailable")]
725    #[validate(custom(function = "validate_optional_int_or_percent"))]
726    pub max_unavailable: Option<String>,
727}
728
729/// Validate integer or percentage string (for Option<String> fields)
730fn validate_optional_int_or_percent(value: &str) -> Result<(), ValidationError> {
731    if value.is_empty() {
732        return Ok(());
733    }
734    // Allow integers or percentages
735    static INT_OR_PERCENT_REGEX: LazyLock<Regex> =
736        LazyLock::new(|| Regex::new(r"^([0-9]+|[0-9]+%)$").unwrap());
737    if !INT_OR_PERCENT_REGEX.is_match(value) {
738        return Err(ValidationError::new("invalid_int_or_percent").with_message(
739            format!(
740                "'{}' must be an integer or percentage (e.g., '1' or '25%')",
741                value
742            )
743            .into(),
744        ));
745    }
746    Ok(())
747}
748
749impl Default for PdbSpec {
750    fn default() -> Self {
751        Self {
752            enabled: true,
753            min_available: None,
754            max_unavailable: Some("1".to_string()),
755        }
756    }
757}
758
759/// Probe configuration for liveness/readiness
760#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
761#[serde(rename_all = "camelCase")]
762pub struct ProbeSpec {
763    /// Enable the probe
764    #[serde(default = "default_true")]
765    pub enabled: bool,
766
767    /// Initial delay before first probe (0-3600 seconds)
768    #[serde(default = "default_initial_delay")]
769    #[validate(range(min = 0, max = 3600, message = "initial delay must be 0-3600 seconds"))]
770    pub initial_delay_seconds: i32,
771
772    /// Period between probes (1-300 seconds)
773    #[serde(default = "default_period")]
774    #[validate(range(min = 1, max = 300, message = "period must be 1-300 seconds"))]
775    pub period_seconds: i32,
776
777    /// Timeout for probe (1-60 seconds)
778    #[serde(default = "default_timeout")]
779    #[validate(range(min = 1, max = 60, message = "timeout must be 1-60 seconds"))]
780    pub timeout_seconds: i32,
781
782    /// Success threshold (1-10)
783    #[serde(default = "default_one")]
784    #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
785    pub success_threshold: i32,
786
787    /// Failure threshold (1-30)
788    #[serde(default = "default_three")]
789    #[validate(range(min = 1, max = 30, message = "failure threshold must be 1-30"))]
790    pub failure_threshold: i32,
791}
792
793impl Default for ProbeSpec {
794    fn default() -> Self {
795        Self {
796            enabled: true,
797            initial_delay_seconds: 30,
798            period_seconds: 10,
799            timeout_seconds: 5,
800            success_threshold: 1,
801            failure_threshold: 3,
802        }
803    }
804}
805
806/// Status of a RivvenCluster resource
807#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
808#[serde(rename_all = "camelCase")]
809pub struct RivvenClusterStatus {
810    /// Current phase of the cluster
811    pub phase: ClusterPhase,
812
813    /// Total number of replicas
814    pub replicas: i32,
815
816    /// Number of ready replicas
817    pub ready_replicas: i32,
818
819    /// Number of updated replicas
820    pub updated_replicas: i32,
821
822    /// Current observed generation
823    pub observed_generation: i64,
824
825    /// Conditions describing cluster state
826    #[serde(default)]
827    pub conditions: Vec<ClusterCondition>,
828
829    /// Broker endpoints
830    #[serde(default)]
831    pub broker_endpoints: Vec<String>,
832
833    /// Current leader broker (if known)
834    pub leader: Option<String>,
835
836    /// Last time the status was updated
837    pub last_updated: Option<String>,
838
839    /// Error message if any
840    pub message: Option<String>,
841}
842
843/// Phase of the cluster lifecycle
844#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
845pub enum ClusterPhase {
846    /// Cluster is being created
847    #[default]
848    Pending,
849    /// Cluster is being provisioned
850    Provisioning,
851    /// Cluster is running and healthy
852    Running,
853    /// Cluster is updating/rolling
854    Updating,
855    /// Cluster is in degraded state
856    Degraded,
857    /// Cluster has failed
858    Failed,
859    /// Cluster is being deleted
860    Terminating,
861}
862
863/// Condition describing an aspect of cluster state
864#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
865#[serde(rename_all = "camelCase")]
866pub struct ClusterCondition {
867    /// Type of condition
868    #[serde(rename = "type")]
869    pub condition_type: String,
870
871    /// Status of the condition (True, False, Unknown)
872    pub status: String,
873
874    /// Reason for the condition
875    pub reason: Option<String>,
876
877    /// Human-readable message
878    pub message: Option<String>,
879
880    /// Last transition time
881    pub last_transition_time: Option<String>,
882}
883
884// Default value functions
885fn default_replicas() -> i32 {
886    3
887}
888
889fn default_version() -> String {
890    "0.0.1".to_string()
891}
892
893fn default_image_pull_policy() -> String {
894    "IfNotPresent".to_string()
895}
896
897fn default_storage_size() -> String {
898    "10Gi".to_string()
899}
900
901fn default_access_modes() -> Vec<String> {
902    vec!["ReadWriteOnce".to_string()]
903}
904
905fn default_partitions() -> i32 {
906    3
907}
908
909fn default_replication_factor() -> i32 {
910    2
911}
912
913fn default_log_retention_hours() -> i32 {
914    168 // 7 days
915}
916
917fn default_log_segment_bytes() -> i64 {
918    1073741824 // 1 GiB
919}
920
921fn default_max_message_bytes() -> i64 {
922    1048576 // 1 MiB
923}
924
925fn default_compression() -> String {
926    "lz4".to_string()
927}
928
929fn default_election_timeout() -> i32 {
930    1000
931}
932
933fn default_heartbeat_interval() -> i32 {
934    100
935}
936
937fn default_metrics_port() -> i32 {
938    9090
939}
940
941fn default_scrape_interval() -> String {
942    "30s".to_string()
943}
944
945fn default_max_unavailable() -> Option<String> {
946    Some("1".to_string())
947}
948
949fn default_initial_delay() -> i32 {
950    30
951}
952
953fn default_period() -> i32 {
954    10
955}
956
957fn default_timeout() -> i32 {
958    5
959}
960
961fn default_one() -> i32 {
962    1
963}
964
965fn default_three() -> i32 {
966    3
967}
968
969fn default_true() -> bool {
970    true
971}
972
973impl RivvenClusterSpec {
974    /// Get the full container image including version
975    pub fn get_image(&self) -> String {
976        if let Some(ref image) = self.image {
977            image.clone()
978        } else {
979            format!("ghcr.io/hupe1980/rivven:{}", self.version)
980        }
981    }
982
983    /// Get labels for managed resources
984    pub fn get_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
985        let mut labels = BTreeMap::new();
986        labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
987        labels.insert(
988            "app.kubernetes.io/instance".to_string(),
989            cluster_name.to_string(),
990        );
991        labels.insert(
992            "app.kubernetes.io/component".to_string(),
993            "broker".to_string(),
994        );
995        labels.insert(
996            "app.kubernetes.io/managed-by".to_string(),
997            "rivven-operator".to_string(),
998        );
999        labels.insert(
1000            "app.kubernetes.io/version".to_string(),
1001            self.version.clone(),
1002        );
1003        labels
1004    }
1005
1006    /// Get selector labels for managed resources
1007    pub fn get_selector_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
1008        let mut labels = BTreeMap::new();
1009        labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
1010        labels.insert(
1011            "app.kubernetes.io/instance".to_string(),
1012            cluster_name.to_string(),
1013        );
1014        labels
1015    }
1016}
1017
1018// ============================================================================
1019// RivvenConnect CRD - Connector Framework for Rivven
1020// ============================================================================
1021
1022/// RivvenConnect custom resource for managing connectors
1023///
1024/// This CRD allows declarative management of Rivven Connect pipelines,
1025/// including source connectors (CDC, HTTP, etc.) and sink connectors
1026/// (S3, stdout, HTTP webhooks, etc.).
1027#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1028#[kube(
1029    group = "rivven.hupe1980.github.io",
1030    version = "v1alpha1",
1031    kind = "RivvenConnect",
1032    plural = "rivvenconnects",
1033    shortname = "rcon",
1034    namespaced,
1035    status = "RivvenConnectStatus",
1036    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1037    printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
1038    printcolumn = r#"{"name":"Sources","type":"integer","jsonPath":".status.sourcesRunning"}"#,
1039    printcolumn = r#"{"name":"Sinks","type":"integer","jsonPath":".status.sinksRunning"}"#,
1040    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1041    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1042)]
1043#[serde(rename_all = "camelCase")]
1044pub struct RivvenConnectSpec {
1045    /// Reference to the RivvenCluster this connect instance connects to
1046    #[validate(nested)]
1047    pub cluster_ref: ClusterReference,
1048
1049    /// Number of connect worker replicas (1-10)
1050    #[serde(default = "default_connect_replicas")]
1051    #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
1052    pub replicas: i32,
1053
1054    /// Connect image version
1055    #[serde(default = "default_version")]
1056    pub version: String,
1057
1058    /// Custom container image (overrides version-based default)
1059    #[serde(default)]
1060    #[validate(custom(function = "validate_optional_image"))]
1061    pub image: Option<String>,
1062
1063    /// Image pull policy
1064    #[serde(default = "default_image_pull_policy")]
1065    #[validate(custom(function = "validate_pull_policy"))]
1066    pub image_pull_policy: String,
1067
1068    /// Image pull secrets
1069    #[serde(default)]
1070    pub image_pull_secrets: Vec<String>,
1071
1072    /// Resource requests/limits (following k8s ResourceRequirements schema)
1073    #[serde(default)]
1074    pub resources: Option<serde_json::Value>,
1075
1076    /// Global connect configuration
1077    #[serde(default)]
1078    #[validate(nested)]
1079    pub config: ConnectConfigSpec,
1080
1081    /// Source connectors (read from external systems, publish to Rivven)
1082    #[serde(default)]
1083    #[validate(length(max = 50, message = "maximum 50 source connectors allowed"))]
1084    pub sources: Vec<SourceConnectorSpec>,
1085
1086    /// Sink connectors (consume from Rivven, write to external systems)
1087    #[serde(default)]
1088    #[validate(length(max = 50, message = "maximum 50 sink connectors allowed"))]
1089    pub sinks: Vec<SinkConnectorSpec>,
1090
1091    /// Global settings for all connectors
1092    #[serde(default)]
1093    #[validate(nested)]
1094    pub settings: GlobalConnectSettings,
1095
1096    /// TLS configuration for broker connection
1097    #[serde(default)]
1098    #[validate(nested)]
1099    pub tls: ConnectTlsSpec,
1100
1101    /// Pod annotations
1102    #[serde(default)]
1103    #[validate(custom(function = "validate_annotations"))]
1104    pub pod_annotations: BTreeMap<String, String>,
1105
1106    /// Pod labels (cannot override app.kubernetes.io/* labels)
1107    #[serde(default)]
1108    #[validate(custom(function = "validate_labels"))]
1109    pub pod_labels: BTreeMap<String, String>,
1110
1111    /// Environment variables for the container
1112    #[serde(default)]
1113    #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
1114    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
1115
1116    /// Node selector for pod scheduling
1117    #[serde(default)]
1118    pub node_selector: BTreeMap<String, String>,
1119
1120    /// Pod tolerations
1121    #[serde(default)]
1122    #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
1123    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
1124
1125    /// Pod affinity rules
1126    #[serde(default)]
1127    pub affinity: Option<serde_json::Value>,
1128
1129    /// Service account name
1130    #[serde(default)]
1131    #[validate(custom(function = "validate_optional_k8s_name"))]
1132    pub service_account: Option<String>,
1133
1134    /// Pod security context
1135    #[serde(default)]
1136    pub security_context: Option<serde_json::Value>,
1137
1138    /// Container security context
1139    #[serde(default)]
1140    pub container_security_context: Option<serde_json::Value>,
1141}
1142
1143/// Reference to a RivvenCluster
1144#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1145#[serde(rename_all = "camelCase")]
1146pub struct ClusterReference {
1147    /// Name of the RivvenCluster
1148    #[validate(length(min = 1, max = 63, message = "cluster name must be 1-63 characters"))]
1149    #[validate(custom(function = "validate_k8s_name"))]
1150    pub name: String,
1151
1152    /// Namespace of the RivvenCluster (defaults to same namespace)
1153    #[serde(default)]
1154    #[validate(custom(function = "validate_optional_k8s_name"))]
1155    pub namespace: Option<String>,
1156}
1157
1158// ============================================================================
1159// RivvenTopic CRD - Declarative Topic Management
1160// ============================================================================
1161
1162/// RivvenTopic custom resource for declarative topic management
1163///
1164/// This CRD allows users to define topics as Kubernetes resources,
1165/// enabling GitOps workflows for topic lifecycle management.
1166///
1167/// # Example
1168///
1169/// ```yaml
1170/// apiVersion: rivven.hupe1980.github.io/v1alpha1
1171/// kind: RivvenTopic
1172/// metadata:
1173///   name: orders-events
1174///   namespace: production
1175/// spec:
1176///   clusterRef:
1177///     name: my-rivven-cluster
1178///   partitions: 12
1179///   replicationFactor: 3
1180///   config:
1181///     retentionMs: 604800000   # 7 days
1182///     cleanupPolicy: delete
1183///     compressionType: lz4
1184///   acls:
1185///     - principal: "user:order-service"
1186///       operations: ["Read", "Write"]
1187///     - principal: "user:analytics"
1188///       operations: ["Read"]
1189/// ```
1190// Note: These types are defined for CRD generation and future controller use.
1191// They are intentionally not yet constructed - the controller will use them.
1192#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1193#[kube(
1194    group = "rivven.hupe1980.github.io",
1195    version = "v1alpha1",
1196    kind = "RivvenTopic",
1197    plural = "rivventopics",
1198    shortname = "rt",
1199    namespaced,
1200    status = "RivvenTopicStatus",
1201    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1202    printcolumn = r#"{"name":"Partitions","type":"integer","jsonPath":".spec.partitions"}"#,
1203    printcolumn = r#"{"name":"Replication","type":"integer","jsonPath":".spec.replicationFactor"}"#,
1204    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1205    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1206)]
1207#[serde(rename_all = "camelCase")]
1208pub struct RivvenTopicSpec {
1209    /// Reference to the RivvenCluster this topic belongs to
1210    #[validate(nested)]
1211    pub cluster_ref: ClusterReference,
1212
1213    /// Number of partitions for the topic (1-10000)
1214    /// Cannot be decreased after creation
1215    #[serde(default = "default_rivven_topic_partitions")]
1216    #[validate(range(
1217        min = 1,
1218        max = 10000,
1219        message = "partitions must be between 1 and 10000"
1220    ))]
1221    pub partitions: i32,
1222
1223    /// Replication factor for the topic (1-10)
1224    /// Must not exceed the number of brokers in the cluster
1225    #[serde(default = "default_rivven_topic_replication")]
1226    #[validate(range(
1227        min = 1,
1228        max = 10,
1229        message = "replication factor must be between 1 and 10"
1230    ))]
1231    pub replication_factor: i32,
1232
1233    /// Topic configuration parameters
1234    #[serde(default)]
1235    #[validate(nested)]
1236    pub config: TopicConfig,
1237
1238    /// Access control list entries for the topic
1239    #[serde(default)]
1240    #[validate(length(max = 100, message = "maximum 100 ACL entries allowed"))]
1241    #[validate(custom(function = "validate_topic_acls"))]
1242    pub acls: Vec<TopicAcl>,
1243
1244    /// Whether to delete the topic from Rivven when the CRD is deleted
1245    /// Default: true (topic is deleted when CRD is removed)
1246    #[serde(default = "default_true")]
1247    pub delete_on_remove: bool,
1248
1249    /// Labels to apply to the topic metadata
1250    #[serde(default)]
1251    #[validate(custom(function = "validate_labels"))]
1252    pub topic_labels: BTreeMap<String, String>,
1253}
1254
1255fn default_rivven_topic_partitions() -> i32 {
1256    3
1257}
1258
1259fn default_rivven_topic_replication() -> i32 {
1260    1
1261}
1262
1263/// Topic configuration parameters
1264#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1265#[serde(rename_all = "camelCase")]
1266pub struct TopicConfig {
1267    /// Retention time in milliseconds (1 hour to 10 years)
1268    /// Default: 604800000 (7 days)
1269    #[serde(default = "default_topic_retention_ms")]
1270    #[validate(range(
1271        min = 3600000,
1272        max = 315360000000_i64,
1273        message = "retention must be between 1 hour and 10 years"
1274    ))]
1275    pub retention_ms: i64,
1276
1277    /// Retention size in bytes per partition (-1 for unlimited)
1278    /// Default: -1 (unlimited)
1279    #[serde(default = "default_topic_retention_bytes")]
1280    #[validate(custom(function = "validate_topic_retention_bytes"))]
1281    pub retention_bytes: i64,
1282
1283    /// Segment size in bytes (1MB to 10GB)
1284    #[serde(default = "default_topic_segment_bytes")]
1285    #[validate(custom(function = "validate_segment_size"))]
1286    pub segment_bytes: i64,
1287
1288    /// Cleanup policy: "delete", "compact", or "delete,compact"
1289    #[serde(default = "default_topic_cleanup_policy")]
1290    #[validate(custom(function = "validate_topic_cleanup_policy"))]
1291    pub cleanup_policy: String,
1292
1293    /// Compression type: "none", "gzip", "snappy", "lz4", "zstd"
1294    #[serde(default = "default_topic_compression")]
1295    #[validate(custom(function = "validate_topic_compression"))]
1296    pub compression_type: String,
1297
1298    /// Minimum number of in-sync replicas for writes (1-10)
1299    #[serde(default = "default_topic_min_isr")]
1300    #[validate(range(min = 1, max = 10, message = "min ISR must be between 1 and 10"))]
1301    pub min_insync_replicas: i32,
1302
1303    /// Maximum message size in bytes (1KB to 100MB)
1304    #[serde(default = "default_max_message_bytes")]
1305    #[validate(custom(function = "validate_message_size"))]
1306    pub max_message_bytes: i64,
1307
1308    /// Whether to enable message timestamps
1309    #[serde(default = "default_true")]
1310    pub message_timestamp_enabled: bool,
1311
1312    /// Timestamp type: "CreateTime" or "LogAppendTime"
1313    #[serde(default = "default_topic_timestamp_type")]
1314    #[validate(custom(function = "validate_topic_timestamp_type"))]
1315    pub message_timestamp_type: String,
1316
1317    /// Whether to enable idempotent writes
1318    #[serde(default = "default_true")]
1319    pub idempotent_writes: bool,
1320
1321    /// Flush interval in milliseconds (0 for no scheduled flush)
1322    #[serde(default)]
1323    #[validate(range(min = 0, max = 86400000, message = "flush interval must be 0-24 hours"))]
1324    pub flush_interval_ms: i64,
1325
1326    /// Custom configuration key-value pairs
1327    #[serde(default)]
1328    #[validate(custom(function = "validate_topic_custom_config"))]
1329    pub custom: BTreeMap<String, String>,
1330}
1331
1332fn default_topic_retention_ms() -> i64 {
1333    604800000 // 7 days
1334}
1335
1336fn default_topic_retention_bytes() -> i64 {
1337    -1 // unlimited
1338}
1339
1340fn default_topic_segment_bytes() -> i64 {
1341    1073741824 // 1GB
1342}
1343
1344fn default_topic_cleanup_policy() -> String {
1345    "delete".to_string()
1346}
1347
1348fn default_topic_compression() -> String {
1349    "lz4".to_string()
1350}
1351
1352fn default_topic_min_isr() -> i32 {
1353    1
1354}
1355
1356fn default_topic_timestamp_type() -> String {
1357    "CreateTime".to_string()
1358}
1359
1360fn validate_topic_retention_bytes(value: i64) -> Result<(), ValidationError> {
1361    if value == -1 || (1048576..=10995116277760).contains(&value) {
1362        Ok(())
1363    } else {
1364        Err(ValidationError::new("invalid_retention_bytes")
1365            .with_message("retention_bytes must be -1 (unlimited) or 1MB-10TB".into()))
1366    }
1367}
1368
1369fn validate_topic_cleanup_policy(policy: &str) -> Result<(), ValidationError> {
1370    match policy {
1371        "delete" | "compact" | "delete,compact" | "compact,delete" => Ok(()),
1372        _ => Err(ValidationError::new("invalid_cleanup_policy").with_message(
1373            "cleanup_policy must be 'delete', 'compact', or 'delete,compact'".into(),
1374        )),
1375    }
1376}
1377
1378fn validate_topic_compression(compression: &str) -> Result<(), ValidationError> {
1379    match compression {
1380        "none" | "gzip" | "snappy" | "lz4" | "zstd" | "producer" => Ok(()),
1381        _ => Err(ValidationError::new("invalid_compression")
1382            .with_message("compression must be none, gzip, snappy, lz4, zstd, or producer".into())),
1383    }
1384}
1385
1386fn validate_topic_timestamp_type(ts_type: &str) -> Result<(), ValidationError> {
1387    match ts_type {
1388        "CreateTime" | "LogAppendTime" => Ok(()),
1389        _ => Err(ValidationError::new("invalid_timestamp_type")
1390            .with_message("timestamp type must be 'CreateTime' or 'LogAppendTime'".into())),
1391    }
1392}
1393
1394fn validate_topic_custom_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
1395    if config.len() > 50 {
1396        return Err(ValidationError::new("too_many_custom_configs")
1397            .with_message("maximum 50 custom config entries allowed".into()));
1398    }
1399    for (key, value) in config {
1400        if key.len() > 128 || value.len() > 4096 {
1401            return Err(ValidationError::new("config_too_long")
1402                .with_message("config key max 128 chars, value max 4096 chars".into()));
1403        }
1404        // Prevent overriding protected configs
1405        let protected = [
1406            "retention.ms",
1407            "retention.bytes",
1408            "segment.bytes",
1409            "cleanup.policy",
1410        ];
1411        if protected.contains(&key.as_str()) {
1412            return Err(ValidationError::new("protected_config").with_message(
1413                format!(
1414                    "'{}' must be set via dedicated field, not custom config",
1415                    key
1416                )
1417                .into(),
1418            ));
1419        }
1420    }
1421    Ok(())
1422}
1423
1424/// Topic ACL entry
1425#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1426#[serde(rename_all = "camelCase")]
1427pub struct TopicAcl {
1428    /// Principal (e.g., "user:myuser", "group:mygroup", "*")
1429    #[validate(length(min = 1, max = 256, message = "principal must be 1-256 characters"))]
1430    #[validate(custom(function = "validate_principal"))]
1431    pub principal: String,
1432
1433    /// Allowed operations: Read, Write, Create, Delete, Alter, Describe, All
1434    #[validate(length(min = 1, max = 7, message = "must specify 1-7 operations"))]
1435    #[validate(custom(function = "validate_operations"))]
1436    pub operations: Vec<String>,
1437
1438    /// Permission type: Allow or Deny (default: Allow)
1439    #[serde(default = "default_permission_type")]
1440    #[validate(custom(function = "validate_permission_type"))]
1441    pub permission_type: String,
1442
1443    /// Host restriction (default: "*" for any host)
1444    #[serde(default = "default_acl_host")]
1445    #[validate(length(max = 256, message = "host must be max 256 characters"))]
1446    pub host: String,
1447}
1448
1449fn default_permission_type() -> String {
1450    "Allow".to_string()
1451}
1452
1453fn default_acl_host() -> String {
1454    "*".to_string()
1455}
1456
1457fn validate_principal(principal: &str) -> Result<(), ValidationError> {
1458    if principal == "*" {
1459        return Ok(());
1460    }
1461    if let Some((prefix, name)) = principal.split_once(':') {
1462        if !["user", "group", "User", "Group"].contains(&prefix) {
1463            return Err(ValidationError::new("invalid_principal_prefix")
1464                .with_message("principal prefix must be 'user:' or 'group:'".into()));
1465        }
1466        if name.is_empty() || name.len() > 128 {
1467            return Err(ValidationError::new("invalid_principal_name")
1468                .with_message("principal name must be 1-128 characters".into()));
1469        }
1470        Ok(())
1471    } else {
1472        Err(ValidationError::new("invalid_principal_format")
1473            .with_message("principal must be '*' or 'user:name' or 'group:name'".into()))
1474    }
1475}
1476
1477fn validate_operations(ops: &[String]) -> Result<(), ValidationError> {
1478    let valid_ops = [
1479        "Read",
1480        "Write",
1481        "Create",
1482        "Delete",
1483        "Alter",
1484        "Describe",
1485        "All",
1486        "DescribeConfigs",
1487        "AlterConfigs",
1488    ];
1489    for op in ops {
1490        if !valid_ops.contains(&op.as_str()) {
1491            return Err(ValidationError::new("invalid_operation").with_message(
1492                format!("'{}' is not a valid operation. Valid: {:?}", op, valid_ops).into(),
1493            ));
1494        }
1495    }
1496    Ok(())
1497}
1498
1499fn validate_permission_type(perm: &str) -> Result<(), ValidationError> {
1500    match perm {
1501        "Allow" | "Deny" => Ok(()),
1502        _ => Err(ValidationError::new("invalid_permission_type")
1503            .with_message("permission_type must be 'Allow' or 'Deny'".into())),
1504    }
1505}
1506
1507fn validate_topic_acls(acls: &[TopicAcl]) -> Result<(), ValidationError> {
1508    // Check for duplicate principal+operation combinations
1509    let mut seen = std::collections::HashSet::new();
1510    for acl in acls {
1511        for op in &acl.operations {
1512            let key = format!("{}:{}", acl.principal, op);
1513            if !seen.insert(key.clone()) {
1514                return Err(ValidationError::new("duplicate_acl")
1515                    .with_message(format!("duplicate ACL entry for {}", key).into()));
1516            }
1517        }
1518    }
1519    Ok(())
1520}
1521
1522/// Status of the RivvenTopic resource
1523#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1524#[serde(rename_all = "camelCase")]
1525pub struct RivvenTopicStatus {
1526    /// Current phase: Pending, Creating, Ready, Updating, Deleting, Failed
1527    #[serde(default)]
1528    pub phase: String,
1529
1530    /// Human-readable message about current state
1531    #[serde(default)]
1532    pub message: String,
1533
1534    /// Actual number of partitions (may differ during scaling)
1535    #[serde(default)]
1536    pub current_partitions: i32,
1537
1538    /// Actual replication factor
1539    #[serde(default)]
1540    pub current_replication_factor: i32,
1541
1542    /// Whether the topic exists in Rivven
1543    #[serde(default)]
1544    pub topic_exists: bool,
1545
1546    /// Observed generation for tracking spec changes
1547    #[serde(default)]
1548    pub observed_generation: i64,
1549
1550    /// Conditions for detailed status tracking
1551    #[serde(default)]
1552    pub conditions: Vec<TopicCondition>,
1553
1554    /// Last time the topic was successfully synced
1555    #[serde(default)]
1556    pub last_sync_time: Option<String>,
1557
1558    /// Partition leader information
1559    #[serde(default)]
1560    pub partition_info: Vec<PartitionInfo>,
1561}
1562
1563/// Condition for tracking topic status
1564#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1565#[serde(rename_all = "camelCase")]
1566pub struct TopicCondition {
1567    /// Type of condition: Ready, Synced, ACLsApplied, ConfigApplied
1568    pub r#type: String,
1569
1570    /// Status: True, False, Unknown
1571    pub status: String,
1572
1573    /// Machine-readable reason
1574    pub reason: String,
1575
1576    /// Human-readable message
1577    pub message: String,
1578
1579    /// Last transition time
1580    pub last_transition_time: String,
1581}
1582
1583/// Information about a topic partition
1584#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1585#[serde(rename_all = "camelCase")]
1586pub struct PartitionInfo {
1587    /// Partition ID
1588    pub partition: i32,
1589
1590    /// Leader broker ID
1591    pub leader: i32,
1592
1593    /// Replica broker IDs
1594    pub replicas: Vec<i32>,
1595
1596    /// In-sync replica broker IDs
1597    pub isr: Vec<i32>,
1598}
1599
1600/// Global connect configuration
1601#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1602#[serde(rename_all = "camelCase")]
1603pub struct ConnectConfigSpec {
1604    /// State directory path inside container
1605    #[serde(default = "default_state_dir")]
1606    pub state_dir: String,
1607
1608    /// Log level (trace, debug, info, warn, error)
1609    #[serde(default = "default_log_level")]
1610    #[validate(custom(function = "validate_log_level"))]
1611    pub log_level: String,
1612}
1613
1614fn default_state_dir() -> String {
1615    "/data/connect-state".to_string()
1616}
1617
1618fn default_log_level() -> String {
1619    "info".to_string()
1620}
1621
1622fn validate_log_level(level: &str) -> Result<(), ValidationError> {
1623    match level {
1624        "trace" | "debug" | "info" | "warn" | "error" => Ok(()),
1625        _ => Err(ValidationError::new("invalid_log_level")
1626            .with_message("log level must be one of: trace, debug, info, warn, error".into())),
1627    }
1628}
1629
1630/// Source connector specification (Kafka Connect style - generic config)
1631///
1632/// All connector-specific configuration goes in the generic `config` field.
1633/// Connector validation happens at runtime, not at CRD schema level.
1634/// This design scales to 300+ connectors without CRD changes.
1635#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1636#[serde(rename_all = "camelCase")]
1637pub struct SourceConnectorSpec {
1638    /// Unique name for this source connector
1639    #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1640    #[validate(custom(function = "validate_k8s_name"))]
1641    pub name: String,
1642
1643    /// Connector type (postgres-cdc, mysql-cdc, http, datagen, kafka, mqtt, etc.)
1644    #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1645    #[validate(custom(function = "validate_connector_type"))]
1646    pub connector: String,
1647
1648    /// Target topic to publish events to (fallback/default)
1649    #[validate(length(min = 1, max = 255, message = "topic must be 1-255 characters"))]
1650    pub topic: String,
1651
1652    /// Topic routing pattern for CDC connectors
1653    /// Enables dynamic topic selection based on CDC event metadata.
1654    /// Supported placeholders: {database}, {schema}, {table}
1655    /// Example: "cdc.{schema}.{table}" → "cdc.public.users"
1656    #[serde(default)]
1657    pub topic_routing: Option<String>,
1658
1659    /// Whether this source is enabled
1660    #[serde(default = "default_true")]
1661    pub enabled: bool,
1662
1663    /// Connector-specific configuration (Kafka Connect style)
1664    /// All connector parameters go here. Validated at runtime by the controller.
1665    /// Example for postgres-cdc: {"host": "...", "slot": "...", "tables": [...]}
1666    #[serde(default)]
1667    pub config: serde_json::Value,
1668
1669    /// Secret reference for sensitive configuration (passwords, keys, tokens)
1670    /// The referenced Secret's data will be merged into config at runtime.
1671    #[serde(default)]
1672    #[validate(custom(function = "validate_optional_k8s_name"))]
1673    pub config_secret_ref: Option<String>,
1674
1675    /// Topic configuration (partitions, replication)
1676    #[serde(default)]
1677    #[validate(nested)]
1678    pub topic_config: SourceTopicConfigSpec,
1679}
1680
1681fn validate_connector_type(connector: &str) -> Result<(), ValidationError> {
1682    // Allow standard connectors and custom ones (must be alphanumeric with hyphens)
1683    static CONNECTOR_REGEX: LazyLock<Regex> =
1684        LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
1685    if !CONNECTOR_REGEX.is_match(connector) {
1686        return Err(ValidationError::new("invalid_connector_type")
1687            .with_message("connector type must be lowercase alphanumeric with hyphens".into()));
1688    }
1689    Ok(())
1690}
1691
1692/// Table specification for CDC sources
1693#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1694#[serde(rename_all = "camelCase")]
1695pub struct TableSpec {
1696    /// Schema/namespace (e.g., "public" for PostgreSQL)
1697    #[serde(default)]
1698    pub schema: Option<String>,
1699
1700    /// Table name
1701    #[validate(length(min = 1, max = 128, message = "table name must be 1-128 characters"))]
1702    pub table: String,
1703
1704    /// Override topic for this specific table
1705    #[serde(default)]
1706    pub topic: Option<String>,
1707
1708    /// Columns to include (empty = all columns)
1709    #[serde(default)]
1710    #[validate(length(max = 500, message = "maximum 500 columns per table"))]
1711    pub columns: Vec<String>,
1712
1713    /// Columns to exclude
1714    #[serde(default)]
1715    #[validate(length(max = 500, message = "maximum 500 excluded columns per table"))]
1716    pub exclude_columns: Vec<String>,
1717
1718    /// Column masking rules (column -> mask pattern)
1719    #[serde(default)]
1720    pub column_masks: std::collections::BTreeMap<String, String>,
1721}
1722
1723/// Source topic configuration
1724#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1725#[serde(rename_all = "camelCase")]
1726pub struct SourceTopicConfigSpec {
1727    /// Number of partitions for auto-created topics
1728    #[serde(default)]
1729    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1730    pub partitions: Option<i32>,
1731
1732    /// Replication factor for auto-created topics
1733    #[serde(default)]
1734    #[validate(range(
1735        min = 1,
1736        max = 10,
1737        message = "replication factor must be between 1 and 10"
1738    ))]
1739    pub replication_factor: Option<i32>,
1740
1741    /// Auto-create topics if they don't exist
1742    #[serde(default)]
1743    pub auto_create: Option<bool>,
1744}
1745
1746/// Sink connector specification (Kafka Connect style - generic config)
1747///
1748/// All connector-specific configuration goes in the generic `config` field.
1749/// Connector validation happens at runtime, not at CRD schema level.
1750/// This design scales to 300+ connectors without CRD changes.
1751#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1752#[serde(rename_all = "camelCase")]
1753pub struct SinkConnectorSpec {
1754    /// Unique name for this sink connector
1755    #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1756    #[validate(custom(function = "validate_k8s_name"))]
1757    pub name: String,
1758
1759    /// Connector type (stdout, s3, http, iceberg, delta, elasticsearch, etc.)
1760    #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1761    #[validate(custom(function = "validate_connector_type"))]
1762    pub connector: String,
1763
1764    /// Topics to consume from (supports wildcards like "cdc.*")
1765    #[validate(length(min = 1, max = 100, message = "must have 1-100 topics"))]
1766    pub topics: Vec<String>,
1767
1768    /// Consumer group for offset tracking
1769    #[validate(length(
1770        min = 1,
1771        max = 128,
1772        message = "consumer group must be 1-128 characters"
1773    ))]
1774    pub consumer_group: String,
1775
1776    /// Whether this sink is enabled
1777    #[serde(default = "default_true")]
1778    pub enabled: bool,
1779
1780    /// Starting offset (earliest, latest, or timestamp)
1781    #[serde(default = "default_start_offset")]
1782    #[validate(custom(function = "validate_start_offset"))]
1783    pub start_offset: String,
1784
1785    /// Connector-specific configuration (Kafka Connect style)
1786    /// All connector parameters go here. Validated at runtime by the controller.
1787    /// Example for iceberg: {"catalog": {...}, "namespace": "...", "table": "..."}
1788    #[serde(default)]
1789    pub config: serde_json::Value,
1790
1791    /// Secret reference for sensitive configuration (passwords, keys, tokens)
1792    /// The referenced Secret's data will be merged into config at runtime.
1793    #[serde(default)]
1794    #[validate(custom(function = "validate_optional_k8s_name"))]
1795    pub config_secret_ref: Option<String>,
1796
1797    /// Rate limiting configuration
1798    #[serde(default)]
1799    #[validate(nested)]
1800    pub rate_limit: RateLimitSpec,
1801}
1802
1803fn default_start_offset() -> String {
1804    "latest".to_string()
1805}
1806
1807fn validate_start_offset(offset: &str) -> Result<(), ValidationError> {
1808    match offset {
1809        "earliest" | "latest" => Ok(()),
1810        s => {
1811            // Strict RFC 3339 parsing — rejects malformed timestamps like
1812            // "not-a-dateT:" that the previous heuristic accepted.
1813            chrono::DateTime::parse_from_rfc3339(s).map_err(|e| {
1814                ValidationError::new("invalid_start_offset").with_message(
1815                    format!(
1816                        "start offset must be 'earliest', 'latest', or a valid \
1817                         RFC 3339 timestamp (e.g. '2024-01-01T00:00:00Z'): {}",
1818                        e
1819                    )
1820                    .into(),
1821                )
1822            })?;
1823            Ok(())
1824        }
1825    }
1826}
1827
1828/// Rate limiting configuration for sinks
1829#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1830#[serde(rename_all = "camelCase")]
1831pub struct RateLimitSpec {
1832    /// Maximum events per second (0 = unlimited)
1833    #[serde(default)]
1834    #[validate(range(
1835        min = 0,
1836        max = 1_000_000,
1837        message = "events per second must be 0-1000000"
1838    ))]
1839    pub events_per_second: u64,
1840
1841    /// Burst capacity (extra events above steady rate)
1842    #[serde(default)]
1843    #[validate(range(min = 0, max = 100_000, message = "burst capacity must be 0-100000"))]
1844    pub burst_capacity: Option<u64>,
1845}
1846
1847/// Global settings for all connectors
1848#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1849#[serde(rename_all = "camelCase")]
1850pub struct GlobalConnectSettings {
1851    /// Topic auto-creation settings
1852    #[serde(default)]
1853    #[validate(nested)]
1854    pub topic: TopicSettingsSpec,
1855
1856    /// Retry configuration
1857    #[serde(default)]
1858    #[validate(nested)]
1859    pub retry: RetryConfigSpec,
1860
1861    /// Health check configuration
1862    #[serde(default)]
1863    #[validate(nested)]
1864    pub health: HealthConfigSpec,
1865
1866    /// Metrics configuration
1867    #[serde(default)]
1868    #[validate(nested)]
1869    pub metrics: ConnectMetricsSpec,
1870}
1871
1872/// Topic settings for auto-creation
1873#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1874#[serde(rename_all = "camelCase")]
1875pub struct TopicSettingsSpec {
1876    /// Enable automatic topic creation
1877    #[serde(default = "default_true")]
1878    pub auto_create: bool,
1879
1880    /// Default partitions for new topics
1881    #[serde(default = "default_topic_partitions")]
1882    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1883    pub default_partitions: i32,
1884
1885    /// Default replication factor
1886    #[serde(default = "default_topic_replication")]
1887    #[validate(range(
1888        min = 1,
1889        max = 10,
1890        message = "replication factor must be between 1 and 10"
1891    ))]
1892    pub default_replication_factor: i32,
1893
1894    /// Fail if topic doesn't exist and auto_create is false
1895    #[serde(default = "default_true")]
1896    pub require_topic_exists: bool,
1897}
1898
1899fn default_topic_partitions() -> i32 {
1900    1
1901}
1902
1903fn default_topic_replication() -> i32 {
1904    1
1905}
1906
1907impl Default for TopicSettingsSpec {
1908    fn default() -> Self {
1909        Self {
1910            auto_create: true,
1911            default_partitions: 1,
1912            default_replication_factor: 1,
1913            require_topic_exists: true,
1914        }
1915    }
1916}
1917
1918/// Retry configuration
1919#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1920#[serde(rename_all = "camelCase")]
1921pub struct RetryConfigSpec {
1922    /// Maximum retry attempts
1923    #[serde(default = "default_max_retries")]
1924    #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
1925    pub max_retries: i32,
1926
1927    /// Initial backoff in milliseconds
1928    #[serde(default = "default_initial_backoff_ms")]
1929    #[validate(range(min = 10, max = 60000, message = "initial backoff must be 10-60000ms"))]
1930    pub initial_backoff_ms: i64,
1931
1932    /// Maximum backoff in milliseconds
1933    #[serde(default = "default_max_backoff_ms")]
1934    #[validate(range(
1935        min = 100,
1936        max = 3600000,
1937        message = "max backoff must be 100-3600000ms"
1938    ))]
1939    pub max_backoff_ms: i64,
1940
1941    /// Backoff multiplier
1942    #[serde(default = "default_backoff_multiplier")]
1943    pub backoff_multiplier: f64,
1944}
1945
1946fn default_max_retries() -> i32 {
1947    10
1948}
1949
1950fn default_initial_backoff_ms() -> i64 {
1951    100
1952}
1953
1954fn default_max_backoff_ms() -> i64 {
1955    30000
1956}
1957
1958fn default_backoff_multiplier() -> f64 {
1959    2.0
1960}
1961
1962impl Default for RetryConfigSpec {
1963    fn default() -> Self {
1964        Self {
1965            max_retries: 10,
1966            initial_backoff_ms: 100,
1967            max_backoff_ms: 30000,
1968            backoff_multiplier: 2.0,
1969        }
1970    }
1971}
1972
1973/// Health check configuration
1974#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1975#[serde(rename_all = "camelCase")]
1976pub struct HealthConfigSpec {
1977    /// Enable health check HTTP endpoint
1978    #[serde(default)]
1979    pub enabled: bool,
1980
1981    /// Health check port
1982    #[serde(default = "default_health_port")]
1983    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
1984    pub port: i32,
1985
1986    /// Health check path
1987    #[serde(default = "default_health_path")]
1988    pub path: String,
1989}
1990
1991fn default_health_port() -> i32 {
1992    8080
1993}
1994
1995fn default_health_path() -> String {
1996    "/health".to_string()
1997}
1998
1999impl Default for HealthConfigSpec {
2000    fn default() -> Self {
2001        Self {
2002            enabled: false,
2003            port: 8080,
2004            path: "/health".to_string(),
2005        }
2006    }
2007}
2008
2009/// Metrics configuration for connect
2010#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2011#[serde(rename_all = "camelCase")]
2012pub struct ConnectMetricsSpec {
2013    /// Enable metrics endpoint
2014    #[serde(default)]
2015    pub enabled: bool,
2016
2017    /// Metrics port
2018    #[serde(default = "default_connect_metrics_port")]
2019    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
2020    pub port: i32,
2021}
2022
2023fn default_connect_metrics_port() -> i32 {
2024    9091
2025}
2026
2027impl Default for ConnectMetricsSpec {
2028    fn default() -> Self {
2029        Self {
2030            enabled: false,
2031            port: 9091,
2032        }
2033    }
2034}
2035
2036/// TLS configuration for connect broker connection
2037#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2038#[serde(rename_all = "camelCase")]
2039pub struct ConnectTlsSpec {
2040    /// Enable TLS for broker connection
2041    #[serde(default)]
2042    pub enabled: bool,
2043
2044    /// Secret containing TLS certificates
2045    #[serde(default)]
2046    #[validate(custom(function = "validate_optional_k8s_name"))]
2047    pub cert_secret_name: Option<String>,
2048
2049    /// Enable mTLS (mutual TLS)
2050    #[serde(default)]
2051    pub mtls_enabled: bool,
2052
2053    /// CA secret name for mTLS
2054    #[serde(default)]
2055    #[validate(custom(function = "validate_optional_k8s_name"))]
2056    pub ca_secret_name: Option<String>,
2057
2058    /// Skip server certificate verification (DANGEROUS - testing only)
2059    #[serde(default)]
2060    pub insecure: bool,
2061}
2062
2063fn default_connect_replicas() -> i32 {
2064    1
2065}
2066
2067/// Status of a RivvenConnect resource
2068#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
2069#[serde(rename_all = "camelCase")]
2070pub struct RivvenConnectStatus {
2071    /// Current phase of the connect instance
2072    pub phase: ConnectPhase,
2073
2074    /// Total replicas
2075    pub replicas: i32,
2076
2077    /// Ready replicas
2078    pub ready_replicas: i32,
2079
2080    /// Number of source connectors running
2081    pub sources_running: i32,
2082
2083    /// Number of sink connectors running
2084    pub sinks_running: i32,
2085
2086    /// Total number of sources configured
2087    pub sources_total: i32,
2088
2089    /// Total number of sinks configured
2090    pub sinks_total: i32,
2091
2092    /// Current observed generation
2093    pub observed_generation: i64,
2094
2095    /// Conditions describing connect state
2096    #[serde(default)]
2097    pub conditions: Vec<ConnectCondition>,
2098
2099    /// Individual connector statuses
2100    #[serde(default)]
2101    pub connector_statuses: Vec<ConnectorStatus>,
2102
2103    /// Last time the status was updated
2104    pub last_updated: Option<String>,
2105
2106    /// Error message if any
2107    pub message: Option<String>,
2108}
2109
2110/// Phase of the connect lifecycle
2111#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
2112pub enum ConnectPhase {
2113    /// Connect is being created
2114    #[default]
2115    Pending,
2116    /// Connect is starting up
2117    Starting,
2118    /// Connect is running and healthy
2119    Running,
2120    /// Connect is partially healthy (some connectors failed)
2121    Degraded,
2122    /// Connect has failed
2123    Failed,
2124    /// Connect is being deleted
2125    Terminating,
2126}
2127
2128/// Condition describing an aspect of connect state
2129#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2130#[serde(rename_all = "camelCase")]
2131pub struct ConnectCondition {
2132    /// Type of condition (Ready, BrokerConnected, SourcesHealthy, SinksHealthy)
2133    #[serde(rename = "type")]
2134    pub condition_type: String,
2135
2136    /// Status of the condition (True, False, Unknown)
2137    pub status: String,
2138
2139    /// Reason for the condition
2140    pub reason: Option<String>,
2141
2142    /// Human-readable message
2143    pub message: Option<String>,
2144
2145    /// Last transition time
2146    pub last_transition_time: Option<String>,
2147}
2148
2149/// Status of an individual connector
2150///
2151/// **Limitation:** When `status_source` is `"synthetic"`, this status is
2152/// inferred from the CRD spec (`enabled` flag) and Deployment readiness,
2153/// not queried from the actual running connector instance. Fields like
2154/// `events_processed` are always zero and `state` reflects the *desired*
2155/// state rather than the *observed* runtime state. A connector reported as
2156/// "running" may still be initializing, failing internally, or experiencing
2157/// backpressure. Check `status_source` and `last_probed` to determine
2158/// whether the status reflects real health. To get true runtime health,
2159/// query the Connect worker's health/metrics endpoint directly.
2160#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2161#[serde(rename_all = "camelCase")]
2162pub struct ConnectorStatus {
2163    /// Connector name
2164    pub name: String,
2165
2166    /// Connector type (source or sink)
2167    pub connector_type: String,
2168
2169    /// Connector kind (postgres-cdc, stdout, etc.)
2170    pub kind: String,
2171
2172    /// Current state (running, stopped, failed, disabled, pending)
2173    pub state: String,
2174
2175    /// Origin of this status: `"synthetic"` (inferred from spec/deployment)
2176    /// or `"observed"` (probed from the running connector). Consumers should
2177    /// treat synthetic statuses as best-effort approximations.
2178    #[serde(default = "default_status_source")]
2179    pub status_source: String,
2180
2181    /// ISO 8601 timestamp of the last time the connector was actually probed
2182    /// for health. `None` means no probe has been performed (status is purely
2183    /// synthetic).
2184    pub last_probed: Option<String>,
2185
2186    /// Number of events processed
2187    pub events_processed: i64,
2188
2189    /// Last error message
2190    pub last_error: Option<String>,
2191
2192    /// Last successful operation time
2193    pub last_success_time: Option<String>,
2194}
2195
2196fn default_status_source() -> String {
2197    "synthetic".to_string()
2198}
2199
2200impl RivvenConnectSpec {
2201    /// Get the full container image including version
2202    pub fn get_image(&self) -> String {
2203        if let Some(ref image) = self.image {
2204            image.clone()
2205        } else {
2206            format!("ghcr.io/hupe1980/rivven-connect:{}", self.version)
2207        }
2208    }
2209
2210    /// Get labels for managed resources
2211    pub fn get_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2212        let mut labels = BTreeMap::new();
2213        labels.insert(
2214            "app.kubernetes.io/name".to_string(),
2215            "rivven-connect".to_string(),
2216        );
2217        labels.insert(
2218            "app.kubernetes.io/instance".to_string(),
2219            connect_name.to_string(),
2220        );
2221        labels.insert(
2222            "app.kubernetes.io/component".to_string(),
2223            "connector".to_string(),
2224        );
2225        labels.insert(
2226            "app.kubernetes.io/managed-by".to_string(),
2227            "rivven-operator".to_string(),
2228        );
2229        labels.insert(
2230            "app.kubernetes.io/version".to_string(),
2231            self.version.clone(),
2232        );
2233        labels
2234    }
2235
2236    /// Get selector labels for managed resources
2237    pub fn get_selector_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2238        let mut labels = BTreeMap::new();
2239        labels.insert(
2240            "app.kubernetes.io/name".to_string(),
2241            "rivven-connect".to_string(),
2242        );
2243        labels.insert(
2244            "app.kubernetes.io/instance".to_string(),
2245            connect_name.to_string(),
2246        );
2247        labels
2248    }
2249
2250    /// Get enabled sources count
2251    pub fn enabled_sources_count(&self) -> usize {
2252        self.sources.iter().filter(|s| s.enabled).count()
2253    }
2254
2255    /// Get enabled sinks count
2256    pub fn enabled_sinks_count(&self) -> usize {
2257        self.sinks.iter().filter(|s| s.enabled).count()
2258    }
2259}
2260
2261// ============================================================================
2262// Advanced CDC Configuration Types (Shared between PostgreSQL and MySQL CDC)
2263// ============================================================================
2264// These types map to rivven-cdc features and provide enterprise-grade CDC capabilities.
2265
2266/// Snapshot configuration for initial data load
2267#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2268#[serde(rename_all = "camelCase")]
2269pub struct SnapshotCdcConfigSpec {
2270    /// Batch size for SELECT queries (rows per batch)
2271    #[serde(default = "default_snapshot_batch_size")]
2272    #[validate(range(min = 100, max = 1000000, message = "batch size must be 100-1000000"))]
2273    pub batch_size: i32,
2274
2275    /// Number of tables to snapshot in parallel
2276    #[serde(default = "default_snapshot_parallel_tables")]
2277    #[validate(range(min = 1, max = 32, message = "parallel tables must be 1-32"))]
2278    pub parallel_tables: i32,
2279
2280    /// Query timeout in seconds
2281    #[serde(default = "default_snapshot_query_timeout")]
2282    #[validate(range(min = 10, max = 3600, message = "query timeout must be 10-3600s"))]
2283    pub query_timeout_secs: i32,
2284
2285    /// Delay between batches for backpressure (ms)
2286    #[serde(default)]
2287    #[validate(range(min = 0, max = 60000, message = "throttle delay must be 0-60000ms"))]
2288    pub throttle_delay_ms: i32,
2289
2290    /// Maximum retries per batch on failure
2291    #[serde(default = "default_snapshot_max_retries")]
2292    #[validate(range(min = 0, max = 10, message = "max retries must be 0-10"))]
2293    pub max_retries: i32,
2294
2295    /// Tables to include in snapshot (empty = all tables)
2296    #[serde(default)]
2297    pub include_tables: Vec<String>,
2298
2299    /// Tables to exclude from snapshot
2300    #[serde(default)]
2301    pub exclude_tables: Vec<String>,
2302}
2303
2304fn default_snapshot_batch_size() -> i32 {
2305    10_000
2306}
2307fn default_snapshot_parallel_tables() -> i32 {
2308    4
2309}
2310fn default_snapshot_query_timeout() -> i32 {
2311    300
2312}
2313fn default_snapshot_max_retries() -> i32 {
2314    3
2315}
2316
2317/// Incremental snapshot configuration (non-blocking re-snapshots)
2318#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2319#[serde(rename_all = "camelCase")]
2320pub struct IncrementalSnapshotSpec {
2321    /// Enable incremental snapshots
2322    #[serde(default)]
2323    pub enabled: bool,
2324
2325    /// Chunk size (rows per chunk)
2326    #[serde(default = "default_incremental_chunk_size")]
2327    #[validate(range(min = 100, max = 100000, message = "chunk size must be 100-100000"))]
2328    pub chunk_size: i32,
2329
2330    /// Watermark strategy: insert, update_and_insert
2331    #[serde(default)]
2332    #[validate(custom(function = "validate_watermark_strategy"))]
2333    pub watermark_strategy: String,
2334
2335    /// Signal table for watermark signals
2336    #[serde(default)]
2337    pub watermark_signal_table: Option<String>,
2338
2339    /// Maximum concurrent chunks
2340    #[serde(default = "default_incremental_max_chunks")]
2341    #[validate(range(min = 1, max = 16, message = "max concurrent chunks must be 1-16"))]
2342    pub max_concurrent_chunks: i32,
2343
2344    /// Delay between chunks (ms)
2345    #[serde(default)]
2346    #[validate(range(min = 0, max = 60000, message = "chunk delay must be 0-60000ms"))]
2347    pub chunk_delay_ms: i32,
2348}
2349
2350fn default_incremental_chunk_size() -> i32 {
2351    1024
2352}
2353fn default_incremental_max_chunks() -> i32 {
2354    1
2355}
2356
2357fn validate_watermark_strategy(strategy: &str) -> Result<(), ValidationError> {
2358    match strategy {
2359        "" | "insert" | "update_and_insert" => Ok(()),
2360        _ => Err(ValidationError::new("invalid_watermark_strategy")
2361            .with_message("watermark strategy must be: insert or update_and_insert".into())),
2362    }
2363}
2364
2365/// Signal table configuration for ad-hoc snapshots and control
2366#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2367#[serde(rename_all = "camelCase")]
2368pub struct SignalTableSpec {
2369    /// Enable signal processing
2370    #[serde(default)]
2371    pub enabled: bool,
2372
2373    /// Fully-qualified signal table name (schema.table)
2374    #[serde(default)]
2375    pub data_collection: Option<String>,
2376
2377    /// Topic for signal messages (alternative to table)
2378    #[serde(default)]
2379    pub topic: Option<String>,
2380
2381    /// Enabled signal channels: source, topic, file, api
2382    #[serde(default)]
2383    pub enabled_channels: Vec<String>,
2384
2385    /// Poll interval for file channel (ms)
2386    #[serde(default = "default_signal_poll_interval")]
2387    pub poll_interval_ms: i32,
2388}
2389
2390fn default_signal_poll_interval() -> i32 {
2391    1000
2392}
2393
2394/// Heartbeat monitoring configuration
2395#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2396#[serde(rename_all = "camelCase")]
2397pub struct HeartbeatCdcSpec {
2398    /// Enable heartbeat monitoring
2399    #[serde(default)]
2400    pub enabled: bool,
2401
2402    /// Heartbeat interval in seconds
2403    #[serde(default = "default_heartbeat_cdc_interval")]
2404    #[validate(range(min = 1, max = 3600, message = "heartbeat interval must be 1-3600s"))]
2405    pub interval_secs: i32,
2406
2407    /// Maximum allowed lag before marking as unhealthy (seconds)
2408    #[serde(default = "default_heartbeat_max_lag")]
2409    #[validate(range(min = 10, max = 86400, message = "max lag must be 10-86400s"))]
2410    pub max_lag_secs: i32,
2411
2412    /// Emit heartbeat events to a topic
2413    #[serde(default)]
2414    pub emit_events: bool,
2415
2416    /// Topic name for heartbeat events
2417    #[serde(default)]
2418    pub topic: Option<String>,
2419
2420    /// SQL query to execute on each heartbeat (keeps connections alive)
2421    #[serde(default)]
2422    pub action_query: Option<String>,
2423}
2424
2425fn default_heartbeat_cdc_interval() -> i32 {
2426    10
2427}
2428fn default_heartbeat_max_lag() -> i32 {
2429    300
2430}
2431
2432/// Event deduplication configuration
2433#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2434#[serde(rename_all = "camelCase")]
2435pub struct DeduplicationCdcSpec {
2436    /// Enable deduplication
2437    #[serde(default)]
2438    pub enabled: bool,
2439
2440    /// Bloom filter expected insertions
2441    #[serde(default = "default_bloom_expected")]
2442    #[validate(range(
2443        min = 1000,
2444        max = 10000000,
2445        message = "bloom expected must be 1000-10M"
2446    ))]
2447    pub bloom_expected_insertions: i64,
2448
2449    /// Bloom filter false positive rate (0.001-0.1)
2450    #[serde(default = "default_bloom_fpp")]
2451    pub bloom_fpp: f64,
2452
2453    /// LRU cache size
2454    #[serde(default = "default_lru_size")]
2455    #[validate(range(min = 1000, max = 1000000, message = "LRU size must be 1000-1M"))]
2456    pub lru_size: i64,
2457
2458    /// Deduplication window (seconds)
2459    #[serde(default = "default_dedup_window")]
2460    #[validate(range(min = 60, max = 604800, message = "window must be 60-604800s"))]
2461    pub window_secs: i64,
2462}
2463
2464fn default_bloom_expected() -> i64 {
2465    100_000
2466}
2467fn default_bloom_fpp() -> f64 {
2468    0.01
2469}
2470fn default_lru_size() -> i64 {
2471    10_000
2472}
2473fn default_dedup_window() -> i64 {
2474    3600
2475}
2476
2477/// Transaction topic configuration
2478#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2479#[serde(rename_all = "camelCase")]
2480pub struct TransactionTopicSpec {
2481    /// Enable transaction topic
2482    #[serde(default)]
2483    pub enabled: bool,
2484
2485    /// Transaction topic name (default: {database}.transaction)
2486    #[serde(default)]
2487    pub topic_name: Option<String>,
2488
2489    /// Include transaction data collections in events
2490    #[serde(default = "default_true_cdc")]
2491    pub include_data_collections: bool,
2492
2493    /// Minimum events in transaction to emit (filter small txns)
2494    #[serde(default)]
2495    #[validate(range(min = 0, max = 10000, message = "min events must be 0-10000"))]
2496    pub min_events_threshold: i32,
2497}
2498
2499fn default_true_cdc() -> bool {
2500    true
2501}
2502
2503/// Schema change topic configuration
2504#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2505#[serde(rename_all = "camelCase")]
2506pub struct SchemaChangeTopicSpec {
2507    /// Enable schema change topic
2508    #[serde(default)]
2509    pub enabled: bool,
2510
2511    /// Schema change topic name (default: {database}.schema_changes)
2512    #[serde(default)]
2513    pub topic_name: Option<String>,
2514
2515    /// Include table columns in change events
2516    #[serde(default = "default_true_cdc")]
2517    pub include_columns: bool,
2518
2519    /// Schemas to monitor for changes (empty = all)
2520    #[serde(default)]
2521    pub schemas: Vec<String>,
2522}
2523
2524/// Tombstone configuration for log compaction
2525#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2526#[serde(rename_all = "camelCase")]
2527pub struct TombstoneCdcSpec {
2528    /// Enable tombstone events for deletes
2529    #[serde(default)]
2530    pub enabled: bool,
2531
2532    /// Emit tombstone after delete event
2533    #[serde(default = "default_true_cdc")]
2534    pub after_delete: bool,
2535
2536    /// Tombstone behavior: emit_null, emit_with_key
2537    #[serde(default)]
2538    #[validate(custom(function = "validate_tombstone_behavior"))]
2539    pub behavior: String,
2540}
2541
2542fn validate_tombstone_behavior(behavior: &str) -> Result<(), ValidationError> {
2543    match behavior {
2544        "" | "emit_null" | "emit_with_key" => Ok(()),
2545        _ => Err(ValidationError::new("invalid_tombstone_behavior")
2546            .with_message("tombstone behavior must be: emit_null or emit_with_key".into())),
2547    }
2548}
2549
2550/// Field-level encryption configuration
2551#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2552#[serde(rename_all = "camelCase")]
2553pub struct FieldEncryptionSpec {
2554    /// Enable field encryption
2555    #[serde(default)]
2556    pub enabled: bool,
2557
2558    /// Secret reference containing encryption key
2559    #[serde(default)]
2560    #[validate(custom(function = "validate_optional_k8s_name"))]
2561    pub key_secret_ref: Option<String>,
2562
2563    /// Fields to encrypt (table.column format)
2564    #[serde(default)]
2565    pub fields: Vec<String>,
2566
2567    /// Encryption algorithm (default: aes-256-gcm)
2568    #[serde(default = "default_encryption_algorithm")]
2569    pub algorithm: String,
2570}
2571
2572fn default_encryption_algorithm() -> String {
2573    "aes-256-gcm".to_string()
2574}
2575
2576/// Read-only replica configuration
2577#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2578#[serde(rename_all = "camelCase")]
2579pub struct ReadOnlyReplicaSpec {
2580    /// Enable read-only mode (for connecting to replicas)
2581    #[serde(default)]
2582    pub enabled: bool,
2583
2584    /// Replication lag threshold before warnings (ms)
2585    #[serde(default = "default_lag_threshold")]
2586    #[validate(range(
2587        min = 100,
2588        max = 300000,
2589        message = "lag threshold must be 100-300000ms"
2590    ))]
2591    pub lag_threshold_ms: i64,
2592
2593    /// Enable deduplication (required for replicas)
2594    #[serde(default = "default_true_cdc")]
2595    pub deduplicate: bool,
2596
2597    /// Watermark source: primary, replica
2598    #[serde(default)]
2599    #[validate(custom(function = "validate_watermark_source"))]
2600    pub watermark_source: String,
2601}
2602
2603fn default_lag_threshold() -> i64 {
2604    5000
2605}
2606
2607fn validate_watermark_source(source: &str) -> Result<(), ValidationError> {
2608    match source {
2609        "" | "primary" | "replica" => Ok(()),
2610        _ => Err(ValidationError::new("invalid_watermark_source")
2611            .with_message("watermark source must be: primary or replica".into())),
2612    }
2613}
2614
2615/// Event router configuration with dead letter queue support
2616#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2617#[serde(rename_all = "camelCase")]
2618pub struct EventRouterSpec {
2619    /// Enable event routing
2620    #[serde(default)]
2621    pub enabled: bool,
2622
2623    /// Default destination when no rules match
2624    #[serde(default)]
2625    pub default_destination: Option<String>,
2626
2627    /// Dead letter queue for unroutable events
2628    #[serde(default)]
2629    pub dead_letter_queue: Option<String>,
2630
2631    /// Drop unroutable events instead of sending to DLQ
2632    #[serde(default)]
2633    pub drop_unroutable: bool,
2634
2635    /// Routing rules (evaluated in priority order)
2636    #[serde(default)]
2637    #[validate(nested)]
2638    pub rules: Vec<RouteRuleSpec>,
2639}
2640
2641/// Routing rule specification
2642#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2643#[serde(rename_all = "camelCase")]
2644pub struct RouteRuleSpec {
2645    /// Rule name (for logging/debugging)
2646    #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
2647    pub name: String,
2648
2649    /// Priority (higher = evaluated first)
2650    #[serde(default)]
2651    pub priority: i32,
2652
2653    /// Routing condition type: always, table, table_pattern, schema, operation, field_equals, field_exists
2654    #[validate(custom(function = "validate_route_condition_type"))]
2655    pub condition_type: String,
2656
2657    /// Condition value (table name, pattern, field, etc.)
2658    #[serde(default)]
2659    pub condition_value: Option<String>,
2660
2661    /// Second condition value (for field_equals: expected value)
2662    #[serde(default)]
2663    pub condition_value2: Option<String>,
2664
2665    /// Destination(s) for matching events
2666    pub destinations: Vec<String>,
2667
2668    /// Continue evaluating rules after match (fan-out)
2669    #[serde(default)]
2670    pub continue_matching: bool,
2671}
2672
2673fn validate_route_condition_type(condition: &str) -> Result<(), ValidationError> {
2674    match condition {
2675        "always" | "table" | "table_pattern" | "schema" | "operation" | "field_equals"
2676        | "field_exists" => Ok(()),
2677        _ => Err(ValidationError::new("invalid_route_condition_type").with_message(
2678            "condition type must be: always, table, table_pattern, schema, operation, field_equals, or field_exists".into(),
2679        )),
2680    }
2681}
2682
2683/// Partitioner configuration for Kafka-style partitioning
2684#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2685#[serde(rename_all = "camelCase")]
2686pub struct PartitionerSpec {
2687    /// Enable partitioning
2688    #[serde(default)]
2689    pub enabled: bool,
2690
2691    /// Number of partitions
2692    #[serde(default = "default_num_partitions")]
2693    #[validate(range(min = 1, max = 10000, message = "num partitions must be 1-10000"))]
2694    pub num_partitions: i32,
2695
2696    /// Partitioning strategy: round_robin, key_hash, table_hash, full_table_hash, sticky
2697    #[serde(default = "default_partition_strategy")]
2698    #[validate(custom(function = "validate_partition_strategy"))]
2699    pub strategy: String,
2700
2701    /// Key columns for key_hash strategy
2702    #[serde(default)]
2703    pub key_columns: Vec<String>,
2704
2705    /// Fixed partition for sticky strategy
2706    #[serde(default)]
2707    pub sticky_partition: Option<i32>,
2708}
2709
2710fn default_num_partitions() -> i32 {
2711    16
2712}
2713fn default_partition_strategy() -> String {
2714    "key_hash".to_string()
2715}
2716
2717fn validate_partition_strategy(strategy: &str) -> Result<(), ValidationError> {
2718    match strategy {
2719        "round_robin" | "key_hash" | "table_hash" | "full_table_hash" | "sticky" => Ok(()),
2720        _ => Err(ValidationError::new("invalid_partition_strategy").with_message(
2721            "partition strategy must be: round_robin, key_hash, table_hash, full_table_hash, or sticky".into(),
2722        )),
2723    }
2724}
2725
2726/// Single Message Transform (SMT) configuration
2727#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2728#[serde(rename_all = "camelCase")]
2729pub struct SmtTransformSpec {
2730    /// Transform type (see SMT documentation for available transforms)
2731    #[validate(custom(function = "validate_smt_transform_type"))]
2732    pub transform_type: String,
2733
2734    /// Transform name (optional, for logging)
2735    #[serde(default)]
2736    pub name: Option<String>,
2737
2738    /// Transform-specific configuration
2739    #[serde(default)]
2740    pub config: serde_json::Value,
2741}
2742
2743fn validate_smt_transform_type(transform: &str) -> Result<(), ValidationError> {
2744    match transform {
2745        "extract_new_record_state" | "value_to_key" | "timestamp_converter"
2746        | "timezone_converter" | "mask_field" | "filter" | "flatten" | "insert_field"
2747        | "rename_field" | "replace_field" | "cast" | "regex_router" | "content_router"
2748        | "header_to_value" | "unwrap" | "set_null" | "compute_field" | "conditional_smt" => Ok(()),
2749        _ => Err(ValidationError::new("invalid_smt_transform_type").with_message(
2750            "transform type must be a valid SMT (extract_new_record_state, mask_field, filter, etc.)".into(),
2751        )),
2752    }
2753}
2754
2755/// Parallel CDC processing configuration
2756#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2757#[serde(rename_all = "camelCase")]
2758pub struct ParallelCdcSpec {
2759    /// Enable parallel processing
2760    #[serde(default)]
2761    pub enabled: bool,
2762
2763    /// Maximum concurrent table workers
2764    #[serde(default = "default_parallel_concurrency")]
2765    #[validate(range(min = 1, max = 64, message = "concurrency must be 1-64"))]
2766    pub concurrency: i32,
2767
2768    /// Buffer size per table
2769    #[serde(default = "default_per_table_buffer")]
2770    #[validate(range(
2771        min = 100,
2772        max = 100000,
2773        message = "per table buffer must be 100-100000"
2774    ))]
2775    pub per_table_buffer: i32,
2776
2777    /// Output channel buffer size
2778    #[serde(default = "default_output_buffer")]
2779    #[validate(range(min = 1000, max = 1000000, message = "output buffer must be 1000-1M"))]
2780    pub output_buffer: i32,
2781
2782    /// Enable work stealing between workers
2783    #[serde(default = "default_true_cdc")]
2784    pub work_stealing: bool,
2785
2786    /// Maximum events per second per table (None = unlimited)
2787    #[serde(default)]
2788    pub per_table_rate_limit: Option<i64>,
2789
2790    /// Shutdown timeout in seconds
2791    #[serde(default = "default_shutdown_timeout")]
2792    #[validate(range(min = 1, max = 300, message = "shutdown timeout must be 1-300s"))]
2793    pub shutdown_timeout_secs: i32,
2794}
2795
2796fn default_parallel_concurrency() -> i32 {
2797    4
2798}
2799fn default_per_table_buffer() -> i32 {
2800    1000
2801}
2802fn default_output_buffer() -> i32 {
2803    10_000
2804}
2805fn default_shutdown_timeout() -> i32 {
2806    30
2807}
2808
2809/// Outbox pattern configuration
2810#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2811#[serde(rename_all = "camelCase")]
2812pub struct OutboxSpec {
2813    /// Enable outbox pattern
2814    #[serde(default)]
2815    pub enabled: bool,
2816
2817    /// Outbox table name
2818    #[serde(default = "default_outbox_table")]
2819    #[validate(length(min = 1, max = 128, message = "outbox table must be 1-128 characters"))]
2820    pub table_name: String,
2821
2822    /// Poll interval in milliseconds
2823    #[serde(default = "default_outbox_poll_interval")]
2824    #[validate(range(min = 10, max = 60000, message = "poll interval must be 10-60000ms"))]
2825    pub poll_interval_ms: i32,
2826
2827    /// Maximum events per batch
2828    #[serde(default = "default_outbox_batch_size")]
2829    #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2830    pub batch_size: i32,
2831
2832    /// Maximum delivery retries before DLQ
2833    #[serde(default = "default_outbox_max_retries")]
2834    #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
2835    pub max_retries: i32,
2836
2837    /// Delivery timeout in seconds
2838    #[serde(default = "default_outbox_timeout")]
2839    #[validate(range(min = 1, max = 300, message = "timeout must be 1-300s"))]
2840    pub delivery_timeout_secs: i32,
2841
2842    /// Enable ordered delivery per aggregate
2843    #[serde(default = "default_true_cdc")]
2844    pub ordered_delivery: bool,
2845
2846    /// Retention period for delivered events in seconds
2847    #[serde(default = "default_outbox_retention")]
2848    #[validate(range(min = 60, max = 604800, message = "retention must be 60-604800s"))]
2849    pub retention_secs: i64,
2850
2851    /// Maximum concurrent deliveries
2852    #[serde(default = "default_outbox_concurrency")]
2853    #[validate(range(min = 1, max = 100, message = "concurrency must be 1-100"))]
2854    pub max_concurrency: i32,
2855}
2856
2857fn default_outbox_table() -> String {
2858    "outbox".to_string()
2859}
2860fn default_outbox_poll_interval() -> i32 {
2861    100
2862}
2863fn default_outbox_batch_size() -> i32 {
2864    100
2865}
2866fn default_outbox_max_retries() -> i32 {
2867    3
2868}
2869fn default_outbox_timeout() -> i32 {
2870    30
2871}
2872fn default_outbox_retention() -> i64 {
2873    86400
2874}
2875fn default_outbox_concurrency() -> i32 {
2876    10
2877}
2878
2879/// Health monitoring configuration for CDC connectors
2880#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2881#[serde(rename_all = "camelCase")]
2882pub struct HealthMonitorSpec {
2883    /// Enable health monitoring
2884    #[serde(default)]
2885    pub enabled: bool,
2886
2887    /// Interval between health checks in seconds
2888    #[serde(default = "default_health_check_interval")]
2889    #[validate(range(min = 1, max = 300, message = "check interval must be 1-300s"))]
2890    pub check_interval_secs: i32,
2891
2892    /// Maximum allowed replication lag in milliseconds
2893    #[serde(default = "default_health_max_lag")]
2894    #[validate(range(min = 1000, max = 3600000, message = "max lag must be 1000-3600000ms"))]
2895    pub max_lag_ms: i64,
2896
2897    /// Number of failed checks before marking unhealthy
2898    #[serde(default = "default_health_failure_threshold")]
2899    #[validate(range(min = 1, max = 10, message = "failure threshold must be 1-10"))]
2900    pub failure_threshold: i32,
2901
2902    /// Number of successful checks to recover
2903    #[serde(default = "default_health_success_threshold")]
2904    #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
2905    pub success_threshold: i32,
2906
2907    /// Timeout for individual health checks in seconds
2908    #[serde(default = "default_health_check_timeout")]
2909    #[validate(range(min = 1, max = 60, message = "check timeout must be 1-60s"))]
2910    pub check_timeout_secs: i32,
2911
2912    /// Enable automatic recovery on failure
2913    #[serde(default = "default_true_cdc")]
2914    pub auto_recovery: bool,
2915
2916    /// Initial recovery delay in seconds
2917    #[serde(default = "default_health_recovery_delay")]
2918    #[validate(range(min = 1, max = 300, message = "recovery delay must be 1-300s"))]
2919    pub recovery_delay_secs: i32,
2920
2921    /// Maximum recovery delay in seconds (for exponential backoff)
2922    #[serde(default = "default_health_max_recovery_delay")]
2923    #[validate(range(min = 1, max = 3600, message = "max recovery delay must be 1-3600s"))]
2924    pub max_recovery_delay_secs: i32,
2925}
2926
2927fn default_health_check_interval() -> i32 {
2928    10
2929}
2930fn default_health_max_lag() -> i64 {
2931    30_000
2932}
2933fn default_health_failure_threshold() -> i32 {
2934    3
2935}
2936fn default_health_success_threshold() -> i32 {
2937    2
2938}
2939fn default_health_check_timeout() -> i32 {
2940    5
2941}
2942fn default_health_recovery_delay() -> i32 {
2943    1
2944}
2945fn default_health_max_recovery_delay() -> i32 {
2946    60
2947}
2948
2949// ============================================================================
2950// RivvenSchemaRegistry CRD - Schema Registry for Rivven
2951// ============================================================================
2952
2953/// RivvenSchemaRegistry custom resource for managing Schema Registry instances
2954///
2955/// This CRD allows declarative management of Rivven Schema Registry deployments,
2956/// a Confluent-compatible schema registry supporting Avro, JSON Schema, and Protobuf.
2957///
2958/// # Example
2959///
2960/// ```yaml
2961/// apiVersion: rivven.hupe1980.github.io/v1alpha1
2962/// kind: RivvenSchemaRegistry
2963/// metadata:
2964///   name: production-registry
2965///   namespace: rivven
2966/// spec:
2967///   clusterRef:
2968///     name: my-rivven-cluster
2969///   replicas: 3
2970///   version: "0.0.1"
2971///   storage:
2972///     mode: broker
2973///     topic: _schemas
2974///   compatibility:
2975///     default: BACKWARD
2976///   schemas:
2977///     avro: true
2978///     jsonSchema: true
2979///     protobuf: true
2980///   contexts:
2981///     enabled: true
2982///   tls:
2983///     enabled: true
2984///     certSecretName: schema-registry-tls
2985/// ```
2986#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2987#[kube(
2988    group = "rivven.hupe1980.github.io",
2989    version = "v1alpha1",
2990    kind = "RivvenSchemaRegistry",
2991    plural = "rivvenschemaregistries",
2992    shortname = "rsr",
2993    namespaced,
2994    status = "RivvenSchemaRegistryStatus",
2995    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
2996    printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
2997    printcolumn = r#"{"name":"Ready","type":"integer","jsonPath":".status.readyReplicas"}"#,
2998    printcolumn = r#"{"name":"Schemas","type":"integer","jsonPath":".status.schemasRegistered"}"#,
2999    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
3000    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
3001)]
3002#[serde(rename_all = "camelCase")]
3003pub struct RivvenSchemaRegistrySpec {
3004    /// Reference to the RivvenCluster this registry connects to
3005    #[validate(nested)]
3006    pub cluster_ref: ClusterReference,
3007
3008    /// Number of registry replicas (1-10)
3009    #[serde(default = "default_schema_registry_replicas")]
3010    #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
3011    pub replicas: i32,
3012
3013    /// Schema Registry version
3014    #[serde(default = "default_version")]
3015    pub version: String,
3016
3017    /// Custom container image (overrides version-based default)
3018    #[serde(default)]
3019    #[validate(custom(function = "validate_optional_image"))]
3020    pub image: Option<String>,
3021
3022    /// Image pull policy
3023    #[serde(default = "default_image_pull_policy")]
3024    #[validate(custom(function = "validate_pull_policy"))]
3025    pub image_pull_policy: String,
3026
3027    /// Image pull secrets
3028    #[serde(default)]
3029    pub image_pull_secrets: Vec<String>,
3030
3031    /// Resource requests/limits
3032    #[serde(default)]
3033    pub resources: Option<serde_json::Value>,
3034
3035    /// HTTP server configuration
3036    #[serde(default)]
3037    #[validate(nested)]
3038    pub server: SchemaRegistryServerSpec,
3039
3040    /// Storage backend configuration
3041    #[serde(default)]
3042    #[validate(nested)]
3043    pub storage: SchemaRegistryStorageSpec,
3044
3045    /// Compatibility configuration
3046    #[serde(default)]
3047    #[validate(nested)]
3048    pub compatibility: SchemaCompatibilitySpec,
3049
3050    /// Schema format support configuration
3051    #[serde(default)]
3052    #[validate(nested)]
3053    pub schemas: SchemaFormatSpec,
3054
3055    /// Schema contexts (multi-tenant) configuration
3056    #[serde(default)]
3057    #[validate(nested)]
3058    pub contexts: SchemaContextsSpec,
3059
3060    /// Validation rules configuration
3061    #[serde(default)]
3062    #[validate(nested)]
3063    pub validation: SchemaValidationSpec,
3064
3065    /// Authentication configuration
3066    #[serde(default)]
3067    #[validate(nested)]
3068    pub auth: SchemaRegistryAuthSpec,
3069
3070    /// TLS configuration
3071    #[serde(default)]
3072    #[validate(nested)]
3073    pub tls: SchemaRegistryTlsSpec,
3074
3075    /// Metrics configuration
3076    #[serde(default)]
3077    #[validate(nested)]
3078    pub metrics: SchemaRegistryMetricsSpec,
3079
3080    /// External registry configuration (for mirroring/sync)
3081    #[serde(default)]
3082    #[validate(nested)]
3083    pub external: ExternalRegistrySpec,
3084
3085    /// Pod annotations
3086    #[serde(default)]
3087    #[validate(custom(function = "validate_annotations"))]
3088    pub pod_annotations: BTreeMap<String, String>,
3089
3090    /// Pod labels
3091    #[serde(default)]
3092    #[validate(custom(function = "validate_labels"))]
3093    pub pod_labels: BTreeMap<String, String>,
3094
3095    /// Environment variables
3096    #[serde(default)]
3097    #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
3098    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
3099
3100    /// Node selector for pod scheduling
3101    #[serde(default)]
3102    pub node_selector: BTreeMap<String, String>,
3103
3104    /// Pod tolerations
3105    #[serde(default)]
3106    #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
3107    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
3108
3109    /// Pod affinity rules
3110    #[serde(default)]
3111    pub affinity: Option<serde_json::Value>,
3112
3113    /// Service account name
3114    #[serde(default)]
3115    #[validate(custom(function = "validate_optional_k8s_name"))]
3116    pub service_account: Option<String>,
3117
3118    /// Pod security context
3119    #[serde(default)]
3120    pub security_context: Option<serde_json::Value>,
3121
3122    /// Container security context
3123    #[serde(default)]
3124    pub container_security_context: Option<serde_json::Value>,
3125
3126    /// Liveness probe configuration
3127    #[serde(default)]
3128    #[validate(nested)]
3129    pub liveness_probe: ProbeSpec,
3130
3131    /// Readiness probe configuration
3132    #[serde(default)]
3133    #[validate(nested)]
3134    pub readiness_probe: ProbeSpec,
3135
3136    /// Pod disruption budget configuration
3137    #[serde(default)]
3138    #[validate(nested)]
3139    pub pod_disruption_budget: PdbSpec,
3140}
3141
3142fn default_schema_registry_replicas() -> i32 {
3143    1
3144}
3145
3146/// HTTP server configuration for Schema Registry
3147#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3148#[serde(rename_all = "camelCase")]
3149pub struct SchemaRegistryServerSpec {
3150    /// HTTP server port (1024-65535)
3151    #[serde(default = "default_schema_registry_port")]
3152    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3153    pub port: i32,
3154
3155    /// Bind address (default: 0.0.0.0)
3156    #[serde(default = "default_bind_address")]
3157    pub bind_address: String,
3158
3159    /// Request timeout in seconds
3160    #[serde(default = "default_request_timeout")]
3161    #[validate(range(min = 1, max = 300, message = "timeout must be 1-300 seconds"))]
3162    pub timeout_seconds: i32,
3163
3164    /// Maximum request body size in bytes (default: 10MB)
3165    #[serde(default = "default_max_request_size")]
3166    #[validate(range(
3167        min = 1024,
3168        max = 104857600,
3169        message = "max request size must be 1KB-100MB"
3170    ))]
3171    pub max_request_size: i64,
3172
3173    /// Enable CORS (for web clients)
3174    #[serde(default)]
3175    pub cors_enabled: bool,
3176
3177    /// CORS allowed origins (empty = all)
3178    #[serde(default)]
3179    pub cors_allowed_origins: Vec<String>,
3180}
3181
3182fn default_schema_registry_port() -> i32 {
3183    8081
3184}
3185
3186fn default_bind_address() -> String {
3187    "0.0.0.0".to_string()
3188}
3189
3190fn default_request_timeout() -> i32 {
3191    30
3192}
3193
3194fn default_max_request_size() -> i64 {
3195    10_485_760 // 10MB
3196}
3197
3198impl Default for SchemaRegistryServerSpec {
3199    fn default() -> Self {
3200        Self {
3201            port: 8081,
3202            bind_address: "0.0.0.0".to_string(),
3203            timeout_seconds: 30,
3204            max_request_size: 10_485_760,
3205            cors_enabled: false,
3206            cors_allowed_origins: vec![],
3207        }
3208    }
3209}
3210
3211/// Storage configuration for Schema Registry
3212#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3213#[serde(rename_all = "camelCase")]
3214pub struct SchemaRegistryStorageSpec {
3215    /// Storage mode: memory, broker
3216    #[serde(default = "default_storage_mode")]
3217    #[validate(custom(function = "validate_storage_mode"))]
3218    pub mode: String,
3219
3220    /// Rivven topic for broker-backed storage (required if mode=broker)
3221    #[serde(default = "default_schema_topic")]
3222    #[validate(length(max = 255, message = "topic name max 255 characters"))]
3223    pub topic: String,
3224
3225    /// Topic replication factor
3226    #[serde(default = "default_schema_topic_replication")]
3227    #[validate(range(min = 1, max = 10, message = "replication factor must be 1-10"))]
3228    pub replication_factor: i32,
3229
3230    /// Topic partitions (usually 1 for ordered processing)
3231    #[serde(default = "default_schema_topic_partitions")]
3232    #[validate(range(min = 1, max = 100, message = "partitions must be 1-100"))]
3233    pub partitions: i32,
3234
3235    /// Enable schema normalization
3236    #[serde(default = "default_true")]
3237    pub normalize: bool,
3238}
3239
3240fn default_storage_mode() -> String {
3241    "broker".to_string()
3242}
3243
3244fn default_schema_topic() -> String {
3245    "_schemas".to_string()
3246}
3247
3248fn default_schema_topic_replication() -> i32 {
3249    3
3250}
3251
3252fn default_schema_topic_partitions() -> i32 {
3253    1
3254}
3255
3256fn validate_storage_mode(mode: &str) -> Result<(), ValidationError> {
3257    match mode {
3258        "memory" | "broker" => Ok(()),
3259        _ => Err(ValidationError::new("invalid_storage_mode")
3260            .with_message("storage mode must be 'memory' or 'broker'".into())),
3261    }
3262}
3263
3264impl Default for SchemaRegistryStorageSpec {
3265    fn default() -> Self {
3266        Self {
3267            mode: "broker".to_string(),
3268            topic: "_schemas".to_string(),
3269            replication_factor: 3,
3270            partitions: 1,
3271            normalize: true,
3272        }
3273    }
3274}
3275
3276/// Schema compatibility configuration
3277#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3278#[serde(rename_all = "camelCase")]
3279pub struct SchemaCompatibilitySpec {
3280    /// Default compatibility level for new subjects
3281    #[serde(default = "default_compatibility_level")]
3282    #[validate(custom(function = "validate_compatibility_level"))]
3283    pub default_level: String,
3284
3285    /// Allow per-subject compatibility overrides
3286    #[serde(default = "default_true")]
3287    pub allow_overrides: bool,
3288
3289    /// Per-subject compatibility levels
3290    #[serde(default)]
3291    #[validate(custom(function = "validate_subject_compatibility_map"))]
3292    pub subjects: BTreeMap<String, String>,
3293}
3294
3295fn default_compatibility_level() -> String {
3296    "BACKWARD".to_string()
3297}
3298
3299fn validate_compatibility_level(level: &str) -> Result<(), ValidationError> {
3300    let valid_levels = [
3301        "BACKWARD",
3302        "BACKWARD_TRANSITIVE",
3303        "FORWARD",
3304        "FORWARD_TRANSITIVE",
3305        "FULL",
3306        "FULL_TRANSITIVE",
3307        "NONE",
3308    ];
3309    if valid_levels.contains(&level) {
3310        Ok(())
3311    } else {
3312        Err(
3313            ValidationError::new("invalid_compatibility_level").with_message(
3314                format!(
3315                    "'{}' is not valid. Must be one of: {:?}",
3316                    level, valid_levels
3317                )
3318                .into(),
3319            ),
3320        )
3321    }
3322}
3323
3324fn validate_subject_compatibility_map(
3325    subjects: &BTreeMap<String, String>,
3326) -> Result<(), ValidationError> {
3327    if subjects.len() > 1000 {
3328        return Err(ValidationError::new("too_many_subjects")
3329            .with_message("maximum 1000 per-subject compatibility entries".into()));
3330    }
3331    for (subject, level) in subjects {
3332        if subject.len() > 255 {
3333            return Err(ValidationError::new("subject_name_too_long")
3334                .with_message(format!("subject '{}' exceeds 255 characters", subject).into()));
3335        }
3336        validate_compatibility_level(level)?;
3337    }
3338    Ok(())
3339}
3340
3341impl Default for SchemaCompatibilitySpec {
3342    fn default() -> Self {
3343        Self {
3344            default_level: "BACKWARD".to_string(),
3345            allow_overrides: true,
3346            subjects: BTreeMap::new(),
3347        }
3348    }
3349}
3350
3351/// Schema format support configuration
3352#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3353#[serde(rename_all = "camelCase")]
3354pub struct SchemaFormatSpec {
3355    /// Enable Avro schema support
3356    #[serde(default = "default_true")]
3357    pub avro: bool,
3358
3359    /// Enable JSON Schema support
3360    #[serde(default = "default_true")]
3361    pub json_schema: bool,
3362
3363    /// Enable Protobuf schema support
3364    #[serde(default = "default_true")]
3365    pub protobuf: bool,
3366
3367    /// Strict schema validation
3368    #[serde(default = "default_true")]
3369    pub strict_validation: bool,
3370}
3371
3372impl Default for SchemaFormatSpec {
3373    fn default() -> Self {
3374        Self {
3375            avro: true,
3376            json_schema: true,
3377            protobuf: true,
3378            strict_validation: true,
3379        }
3380    }
3381}
3382
3383/// Schema contexts (multi-tenant) configuration
3384#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3385#[serde(rename_all = "camelCase")]
3386pub struct SchemaContextsSpec {
3387    /// Enable schema contexts for multi-tenant isolation
3388    #[serde(default)]
3389    pub enabled: bool,
3390
3391    /// Maximum number of contexts (0 = unlimited)
3392    #[serde(default)]
3393    #[validate(range(min = 0, max = 10000, message = "max contexts must be 0-10000"))]
3394    pub max_contexts: i32,
3395
3396    /// Pre-defined contexts to create on startup
3397    #[serde(default)]
3398    #[validate(length(max = 100, message = "maximum 100 pre-defined contexts"))]
3399    pub predefined: Vec<SchemaContextDefinition>,
3400}
3401
3402/// Pre-defined schema context definition
3403#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3404#[serde(rename_all = "camelCase")]
3405pub struct SchemaContextDefinition {
3406    /// Context name
3407    #[validate(length(min = 1, max = 128, message = "context name must be 1-128 characters"))]
3408    pub name: String,
3409
3410    /// Context description
3411    #[serde(default)]
3412    #[validate(length(max = 512, message = "description max 512 characters"))]
3413    pub description: Option<String>,
3414
3415    /// Whether this context is active
3416    #[serde(default = "default_true")]
3417    pub active: bool,
3418}
3419
3420/// Validation rules configuration for Schema Registry
3421#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3422#[serde(rename_all = "camelCase")]
3423pub struct SchemaValidationSpec {
3424    /// Enable content validation rules
3425    #[serde(default)]
3426    pub enabled: bool,
3427
3428    /// Maximum schema size in bytes
3429    #[serde(default = "default_max_schema_size")]
3430    #[validate(range(
3431        min = 1024,
3432        max = 10485760,
3433        message = "max schema size must be 1KB-10MB"
3434    ))]
3435    pub max_schema_size: i64,
3436
3437    /// Pre-defined validation rules
3438    #[serde(default)]
3439    #[validate(length(max = 100, message = "maximum 100 validation rules"))]
3440    pub rules: Vec<SchemaValidationRule>,
3441}
3442
3443fn default_max_schema_size() -> i64 {
3444    1_048_576 // 1MB
3445}
3446
3447impl Default for SchemaValidationSpec {
3448    fn default() -> Self {
3449        Self {
3450            enabled: false,
3451            max_schema_size: 1_048_576,
3452            rules: vec![],
3453        }
3454    }
3455}
3456
3457/// Validation rule definition
3458#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3459#[serde(rename_all = "camelCase")]
3460pub struct SchemaValidationRule {
3461    /// Rule name
3462    #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
3463    pub name: String,
3464
3465    /// Rule type: regex, field_exists, field_type, naming_convention, documentation
3466    #[validate(custom(function = "validate_rule_type"))]
3467    pub rule_type: String,
3468
3469    /// Rule configuration/pattern
3470    #[validate(length(min = 1, max = 4096, message = "pattern must be 1-4096 characters"))]
3471    pub pattern: String,
3472
3473    /// Subjects to apply this rule to (empty = all)
3474    #[serde(default)]
3475    pub subjects: Vec<String>,
3476
3477    /// Schema types to apply this rule to
3478    #[serde(default)]
3479    pub schema_types: Vec<String>,
3480
3481    /// Validation level: error, warning, info
3482    #[serde(default = "default_validation_level")]
3483    #[validate(custom(function = "validate_validation_level"))]
3484    pub level: String,
3485
3486    /// Rule description
3487    #[serde(default)]
3488    #[validate(length(max = 512, message = "description max 512 characters"))]
3489    pub description: Option<String>,
3490}
3491
3492fn validate_rule_type(rule_type: &str) -> Result<(), ValidationError> {
3493    let valid_types = [
3494        "regex",
3495        "field_exists",
3496        "field_type",
3497        "naming_convention",
3498        "documentation",
3499    ];
3500    if valid_types.contains(&rule_type) {
3501        Ok(())
3502    } else {
3503        Err(ValidationError::new("invalid_rule_type").with_message(
3504            format!(
3505                "'{}' is not valid. Must be one of: {:?}",
3506                rule_type, valid_types
3507            )
3508            .into(),
3509        ))
3510    }
3511}
3512
3513fn default_validation_level() -> String {
3514    "error".to_string()
3515}
3516
3517fn validate_validation_level(level: &str) -> Result<(), ValidationError> {
3518    match level {
3519        "error" | "warning" | "info" => Ok(()),
3520        _ => Err(ValidationError::new("invalid_validation_level")
3521            .with_message("validation level must be 'error', 'warning', or 'info'".into())),
3522    }
3523}
3524
3525/// Authentication configuration for Schema Registry
3526#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3527#[serde(rename_all = "camelCase")]
3528pub struct SchemaRegistryAuthSpec {
3529    /// Enable authentication
3530    #[serde(default)]
3531    pub enabled: bool,
3532
3533    /// Authentication method: basic, jwt, cedar
3534    #[serde(default)]
3535    #[validate(custom(function = "validate_auth_method"))]
3536    pub method: Option<String>,
3537
3538    /// Secret containing authentication credentials
3539    #[serde(default)]
3540    #[validate(custom(function = "validate_optional_k8s_name"))]
3541    pub credentials_secret_ref: Option<String>,
3542
3543    /// JWT/OIDC configuration (if method=jwt)
3544    #[serde(default)]
3545    #[validate(nested)]
3546    pub jwt: JwtAuthSpec,
3547
3548    /// Basic auth users (for method=basic, max 100)
3549    #[serde(default)]
3550    #[validate(length(max = 100, message = "maximum 100 users"))]
3551    pub users: Vec<SchemaRegistryUser>,
3552}
3553
3554fn validate_auth_method(method: &str) -> Result<(), ValidationError> {
3555    match method {
3556        "" | "basic" | "jwt" | "cedar" => Ok(()),
3557        _ => Err(ValidationError::new("invalid_auth_method")
3558            .with_message("auth method must be 'basic', 'jwt', or 'cedar'".into())),
3559    }
3560}
3561
3562/// JWT/OIDC authentication configuration
3563#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3564#[serde(rename_all = "camelCase")]
3565pub struct JwtAuthSpec {
3566    /// OIDC issuer URL
3567    #[serde(default)]
3568    pub issuer_url: Option<String>,
3569
3570    /// JWKS URL (for key validation)
3571    #[serde(default)]
3572    pub jwks_url: Option<String>,
3573
3574    /// Required audience claim
3575    #[serde(default)]
3576    pub audience: Option<String>,
3577
3578    /// Claim to use for username
3579    #[serde(default = "default_username_claim")]
3580    pub username_claim: String,
3581
3582    /// Claim to use for roles
3583    #[serde(default = "default_roles_claim")]
3584    pub roles_claim: String,
3585}
3586
3587fn default_username_claim() -> String {
3588    "sub".to_string()
3589}
3590
3591fn default_roles_claim() -> String {
3592    "roles".to_string()
3593}
3594
3595/// Schema Registry user definition (for basic auth)
3596#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3597#[serde(rename_all = "camelCase")]
3598pub struct SchemaRegistryUser {
3599    /// Username
3600    #[validate(length(min = 1, max = 128, message = "username must be 1-128 characters"))]
3601    pub username: String,
3602
3603    /// Secret key containing password
3604    #[serde(default)]
3605    #[validate(custom(function = "validate_optional_k8s_name"))]
3606    pub password_secret_key: Option<String>,
3607
3608    /// User role: admin, writer, reader
3609    #[serde(default = "default_user_role")]
3610    #[validate(custom(function = "validate_user_role"))]
3611    pub role: String,
3612
3613    /// Allowed subjects (empty = all, for reader/writer roles)
3614    #[serde(default)]
3615    pub allowed_subjects: Vec<String>,
3616}
3617
3618fn default_user_role() -> String {
3619    "reader".to_string()
3620}
3621
3622fn validate_user_role(role: &str) -> Result<(), ValidationError> {
3623    match role {
3624        "admin" | "writer" | "reader" => Ok(()),
3625        _ => Err(ValidationError::new("invalid_user_role")
3626            .with_message("role must be 'admin', 'writer', or 'reader'".into())),
3627    }
3628}
3629
3630/// TLS configuration for Schema Registry
3631#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3632#[serde(rename_all = "camelCase")]
3633pub struct SchemaRegistryTlsSpec {
3634    /// Enable TLS for HTTP server
3635    #[serde(default)]
3636    pub enabled: bool,
3637
3638    /// Secret containing TLS certificates
3639    #[serde(default)]
3640    #[validate(custom(function = "validate_optional_k8s_name"))]
3641    pub cert_secret_name: Option<String>,
3642
3643    /// Enable mTLS (mutual TLS)
3644    #[serde(default)]
3645    pub mtls_enabled: bool,
3646
3647    /// CA secret name for mTLS
3648    #[serde(default)]
3649    #[validate(custom(function = "validate_optional_k8s_name"))]
3650    pub ca_secret_name: Option<String>,
3651
3652    /// Enable TLS for broker connection
3653    #[serde(default)]
3654    pub broker_tls_enabled: bool,
3655
3656    /// Secret for broker TLS certificates
3657    #[serde(default)]
3658    #[validate(custom(function = "validate_optional_k8s_name"))]
3659    pub broker_cert_secret_name: Option<String>,
3660}
3661
3662/// Metrics configuration for Schema Registry
3663#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3664#[serde(rename_all = "camelCase")]
3665pub struct SchemaRegistryMetricsSpec {
3666    /// Enable Prometheus metrics endpoint
3667    #[serde(default = "default_true")]
3668    pub enabled: bool,
3669
3670    /// Metrics endpoint port
3671    #[serde(default = "default_schema_metrics_port")]
3672    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3673    pub port: i32,
3674
3675    /// Metrics endpoint path
3676    #[serde(default = "default_metrics_path")]
3677    pub path: String,
3678
3679    /// Create ServiceMonitor for Prometheus Operator
3680    #[serde(default)]
3681    pub service_monitor_enabled: bool,
3682
3683    /// ServiceMonitor scrape interval
3684    #[serde(default = "default_scrape_interval")]
3685    #[validate(custom(function = "validate_duration"))]
3686    pub scrape_interval: String,
3687}
3688
3689fn default_schema_metrics_port() -> i32 {
3690    9090
3691}
3692
3693fn default_metrics_path() -> String {
3694    "/metrics".to_string()
3695}
3696
3697impl Default for SchemaRegistryMetricsSpec {
3698    fn default() -> Self {
3699        Self {
3700            enabled: true,
3701            port: 9090,
3702            path: "/metrics".to_string(),
3703            service_monitor_enabled: false,
3704            scrape_interval: "30s".to_string(),
3705        }
3706    }
3707}
3708
3709/// External registry configuration for mirroring/sync
3710#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3711#[serde(rename_all = "camelCase")]
3712pub struct ExternalRegistrySpec {
3713    /// Enable external registry integration
3714    #[serde(default)]
3715    pub enabled: bool,
3716
3717    /// External registry type: confluent, glue
3718    #[serde(default)]
3719    #[validate(custom(function = "validate_external_registry_type"))]
3720    pub registry_type: Option<String>,
3721
3722    /// Confluent Schema Registry URL
3723    #[serde(default)]
3724    pub confluent_url: Option<String>,
3725
3726    /// AWS Glue registry ARN
3727    #[serde(default)]
3728    pub glue_registry_arn: Option<String>,
3729
3730    /// AWS region for Glue
3731    #[serde(default)]
3732    pub aws_region: Option<String>,
3733
3734    /// Sync mode: mirror (read from external), push (write to external), bidirectional
3735    #[serde(default)]
3736    #[validate(custom(function = "validate_sync_mode"))]
3737    pub sync_mode: Option<String>,
3738
3739    /// Subjects to sync (empty = all)
3740    #[serde(default)]
3741    pub sync_subjects: Vec<String>,
3742
3743    /// Sync interval in seconds
3744    #[serde(default = "default_sync_interval")]
3745    #[validate(range(
3746        min = 10,
3747        max = 86400,
3748        message = "sync interval must be 10-86400 seconds"
3749    ))]
3750    pub sync_interval_seconds: i32,
3751
3752    /// Credentials secret reference
3753    #[serde(default)]
3754    #[validate(custom(function = "validate_optional_k8s_name"))]
3755    pub credentials_secret_ref: Option<String>,
3756}
3757
3758fn validate_external_registry_type(reg_type: &str) -> Result<(), ValidationError> {
3759    match reg_type {
3760        "" | "confluent" | "glue" => Ok(()),
3761        _ => Err(ValidationError::new("invalid_external_registry_type")
3762            .with_message("registry type must be 'confluent' or 'glue'".into())),
3763    }
3764}
3765
3766fn validate_sync_mode(mode: &str) -> Result<(), ValidationError> {
3767    match mode {
3768        "" | "mirror" | "push" | "bidirectional" => Ok(()),
3769        _ => Err(ValidationError::new("invalid_sync_mode")
3770            .with_message("sync mode must be 'mirror', 'push', or 'bidirectional'".into())),
3771    }
3772}
3773
3774fn default_sync_interval() -> i32 {
3775    300 // 5 minutes
3776}
3777
3778/// Status of a RivvenSchemaRegistry resource
3779#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
3780#[serde(rename_all = "camelCase")]
3781pub struct RivvenSchemaRegistryStatus {
3782    /// Current phase of the registry
3783    #[serde(default)]
3784    pub phase: SchemaRegistryPhase,
3785
3786    /// Total number of replicas
3787    pub replicas: i32,
3788
3789    /// Number of ready replicas
3790    pub ready_replicas: i32,
3791
3792    /// Number of schemas registered
3793    pub schemas_registered: i32,
3794
3795    /// Number of subjects
3796    pub subjects_count: i32,
3797
3798    /// Number of active contexts (if contexts enabled)
3799    pub contexts_count: i32,
3800
3801    /// Current observed generation
3802    pub observed_generation: i64,
3803
3804    /// Conditions describing registry state
3805    #[serde(default)]
3806    pub conditions: Vec<SchemaRegistryCondition>,
3807
3808    /// Registry endpoints (URLs)
3809    #[serde(default)]
3810    pub endpoints: Vec<String>,
3811
3812    /// Storage backend status
3813    pub storage_status: Option<String>,
3814
3815    /// External registry sync status
3816    pub external_sync_status: Option<String>,
3817
3818    /// Last successful external sync time
3819    pub last_sync_time: Option<String>,
3820
3821    /// Last time the status was updated
3822    pub last_updated: Option<String>,
3823
3824    /// Error message if any
3825    pub message: Option<String>,
3826}
3827
3828/// Phase of the Schema Registry lifecycle
3829#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
3830pub enum SchemaRegistryPhase {
3831    /// Registry is being created
3832    #[default]
3833    Pending,
3834    /// Registry is being provisioned
3835    Provisioning,
3836    /// Registry is running and healthy
3837    Running,
3838    /// Registry is updating
3839    Updating,
3840    /// Registry is in degraded state
3841    Degraded,
3842    /// Registry has failed
3843    Failed,
3844    /// Registry is being deleted
3845    Terminating,
3846}
3847
3848/// Condition describing an aspect of Schema Registry state
3849#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
3850#[serde(rename_all = "camelCase")]
3851pub struct SchemaRegistryCondition {
3852    /// Type of condition (Ready, BrokerConnected, StorageHealthy, ExternalSyncHealthy)
3853    #[serde(rename = "type")]
3854    pub condition_type: String,
3855
3856    /// Status of the condition (True, False, Unknown)
3857    pub status: String,
3858
3859    /// Reason for the condition
3860    pub reason: Option<String>,
3861
3862    /// Human-readable message
3863    pub message: Option<String>,
3864
3865    /// Last transition time
3866    pub last_transition_time: Option<String>,
3867}
3868
3869impl RivvenSchemaRegistrySpec {
3870    /// Get the full container image including version
3871    pub fn get_image(&self) -> String {
3872        if let Some(ref image) = self.image {
3873            image.clone()
3874        } else {
3875            format!("ghcr.io/hupe1980/rivven-schema:{}", self.version)
3876        }
3877    }
3878
3879    /// Get labels for managed resources
3880    pub fn get_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3881        let mut labels = BTreeMap::new();
3882        labels.insert(
3883            "app.kubernetes.io/name".to_string(),
3884            "rivven-schema-registry".to_string(),
3885        );
3886        labels.insert(
3887            "app.kubernetes.io/instance".to_string(),
3888            registry_name.to_string(),
3889        );
3890        labels.insert(
3891            "app.kubernetes.io/component".to_string(),
3892            "schema-registry".to_string(),
3893        );
3894        labels.insert(
3895            "app.kubernetes.io/managed-by".to_string(),
3896            "rivven-operator".to_string(),
3897        );
3898        labels.insert(
3899            "app.kubernetes.io/version".to_string(),
3900            self.version.clone(),
3901        );
3902        labels
3903    }
3904
3905    /// Get selector labels for managed resources
3906    pub fn get_selector_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3907        let mut labels = BTreeMap::new();
3908        labels.insert(
3909            "app.kubernetes.io/name".to_string(),
3910            "rivven-schema-registry".to_string(),
3911        );
3912        labels.insert(
3913            "app.kubernetes.io/instance".to_string(),
3914            registry_name.to_string(),
3915        );
3916        labels
3917    }
3918}
3919
3920#[cfg(test)]
3921mod tests {
3922    use super::*;
3923
3924    #[test]
3925    fn test_default_spec() {
3926        let spec = RivvenClusterSpec {
3927            replicas: 3,
3928            version: "0.0.1".to_string(),
3929            image: None,
3930            image_pull_policy: "IfNotPresent".to_string(),
3931            image_pull_secrets: vec![],
3932            storage: StorageSpec::default(),
3933            resources: None,
3934            config: BrokerConfig::default(),
3935            tls: TlsSpec::default(),
3936            metrics: MetricsSpec::default(),
3937            affinity: None,
3938            node_selector: BTreeMap::new(),
3939            tolerations: vec![],
3940            pod_disruption_budget: PdbSpec::default(),
3941            service_account: None,
3942            pod_annotations: BTreeMap::new(),
3943            pod_labels: BTreeMap::new(),
3944            env: vec![],
3945            liveness_probe: ProbeSpec::default(),
3946            readiness_probe: ProbeSpec::default(),
3947            security_context: None,
3948            container_security_context: None,
3949        };
3950
3951        assert_eq!(spec.replicas, 3);
3952        assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven:0.0.1");
3953    }
3954
3955    #[test]
3956    fn test_get_labels() {
3957        let spec = RivvenClusterSpec {
3958            replicas: 3,
3959            version: "0.0.1".to_string(),
3960            image: None,
3961            image_pull_policy: "IfNotPresent".to_string(),
3962            image_pull_secrets: vec![],
3963            storage: StorageSpec::default(),
3964            resources: None,
3965            config: BrokerConfig::default(),
3966            tls: TlsSpec::default(),
3967            metrics: MetricsSpec::default(),
3968            affinity: None,
3969            node_selector: BTreeMap::new(),
3970            tolerations: vec![],
3971            pod_disruption_budget: PdbSpec::default(),
3972            service_account: None,
3973            pod_annotations: BTreeMap::new(),
3974            pod_labels: BTreeMap::new(),
3975            env: vec![],
3976            liveness_probe: ProbeSpec::default(),
3977            readiness_probe: ProbeSpec::default(),
3978            security_context: None,
3979            container_security_context: None,
3980        };
3981
3982        let labels = spec.get_labels("my-cluster");
3983        assert_eq!(
3984            labels.get("app.kubernetes.io/name"),
3985            Some(&"rivven".to_string())
3986        );
3987        assert_eq!(
3988            labels.get("app.kubernetes.io/instance"),
3989            Some(&"my-cluster".to_string())
3990        );
3991    }
3992
3993    #[test]
3994    fn test_custom_image() {
3995        let spec = RivvenClusterSpec {
3996            replicas: 1,
3997            version: "0.0.1".to_string(),
3998            image: Some("my-registry/rivven:custom".to_string()),
3999            image_pull_policy: "Always".to_string(),
4000            image_pull_secrets: vec![],
4001            storage: StorageSpec::default(),
4002            resources: None,
4003            config: BrokerConfig::default(),
4004            tls: TlsSpec::default(),
4005            metrics: MetricsSpec::default(),
4006            affinity: None,
4007            node_selector: BTreeMap::new(),
4008            tolerations: vec![],
4009            pod_disruption_budget: PdbSpec::default(),
4010            service_account: None,
4011            pod_annotations: BTreeMap::new(),
4012            pod_labels: BTreeMap::new(),
4013            env: vec![],
4014            liveness_probe: ProbeSpec::default(),
4015            readiness_probe: ProbeSpec::default(),
4016            security_context: None,
4017            container_security_context: None,
4018        };
4019
4020        assert_eq!(spec.get_image(), "my-registry/rivven:custom");
4021    }
4022
4023    #[test]
4024    fn test_cluster_phase_default() {
4025        let phase = ClusterPhase::default();
4026        assert_eq!(phase, ClusterPhase::Pending);
4027    }
4028
4029    #[test]
4030    fn test_storage_spec_default() {
4031        let storage = StorageSpec::default();
4032        assert_eq!(storage.size, "10Gi");
4033        assert!(storage.storage_class_name.is_none());
4034    }
4035
4036    #[test]
4037    fn test_broker_config_defaults() {
4038        let config = BrokerConfig::default();
4039        assert_eq!(config.default_partitions, 3);
4040        assert_eq!(config.default_replication_factor, 2);
4041        assert!(config.auto_create_topics);
4042    }
4043
4044    #[test]
4045    fn test_probe_spec_defaults() {
4046        let probe = ProbeSpec::default();
4047        assert!(probe.enabled);
4048        assert_eq!(probe.initial_delay_seconds, 30);
4049        assert_eq!(probe.period_seconds, 10);
4050    }
4051
4052    #[test]
4053    fn test_validate_quantity_valid() {
4054        assert!(validate_quantity("10Gi").is_ok());
4055        assert!(validate_quantity("100Mi").is_ok());
4056        assert!(validate_quantity("1Ti").is_ok());
4057        assert!(validate_quantity("500").is_ok());
4058        assert!(validate_quantity("1.5Gi").is_ok());
4059    }
4060
4061    #[test]
4062    fn test_validate_quantity_invalid() {
4063        assert!(validate_quantity("10GB").is_err()); // Wrong suffix
4064        assert!(validate_quantity("abc").is_err()); // Not a number
4065        assert!(validate_quantity("-10Gi").is_err()); // Negative
4066        assert!(validate_quantity("").is_err()); // Empty
4067    }
4068
4069    #[test]
4070    fn test_validate_k8s_name_valid() {
4071        assert!(validate_k8s_name("my-cluster").is_ok());
4072        assert!(validate_k8s_name("cluster123").is_ok());
4073        assert!(validate_k8s_name("a").is_ok());
4074    }
4075
4076    #[test]
4077    fn test_validate_k8s_name_invalid() {
4078        assert!(validate_k8s_name("My-Cluster").is_err()); // Uppercase
4079        assert!(validate_k8s_name("-cluster").is_err()); // Starts with dash
4080        assert!(validate_k8s_name("cluster-").is_err()); // Ends with dash
4081        assert!(validate_k8s_name("cluster_name").is_err()); // Underscore
4082    }
4083
4084    #[test]
4085    fn test_validate_compression_type() {
4086        assert!(validate_compression_type("lz4").is_ok());
4087        assert!(validate_compression_type("zstd").is_ok());
4088        assert!(validate_compression_type("none").is_ok());
4089        assert!(validate_compression_type("invalid").is_err());
4090    }
4091
4092    #[test]
4093    fn test_validate_segment_size() {
4094        assert!(validate_segment_size(1_048_576).is_ok()); // 1MB - minimum
4095        assert!(validate_segment_size(10_737_418_240).is_ok()); // 10GB - maximum
4096        assert!(validate_segment_size(1_073_741_824).is_ok()); // 1GB - valid
4097        assert!(validate_segment_size(1_000).is_err()); // Too small
4098        assert!(validate_segment_size(20_000_000_000).is_err()); // Too large
4099    }
4100
4101    #[test]
4102    fn test_validate_message_size() {
4103        assert!(validate_message_size(1_024).is_ok()); // 1KB - minimum
4104        assert!(validate_message_size(104_857_600).is_ok()); // 100MB - maximum
4105        assert!(validate_message_size(1_048_576).is_ok()); // 1MB - valid
4106        assert!(validate_message_size(100).is_err()); // Too small
4107        assert!(validate_message_size(200_000_000).is_err()); // Too large
4108    }
4109
4110    #[test]
4111    fn test_validate_pull_policy() {
4112        assert!(validate_pull_policy("Always").is_ok());
4113        assert!(validate_pull_policy("IfNotPresent").is_ok());
4114        assert!(validate_pull_policy("Never").is_ok());
4115        assert!(validate_pull_policy("always").is_err()); // Wrong case
4116        assert!(validate_pull_policy("Invalid").is_err());
4117    }
4118
4119    #[test]
4120    fn test_validate_duration() {
4121        assert!(validate_duration("30s").is_ok());
4122        assert!(validate_duration("1m").is_ok());
4123        assert!(validate_duration("5m30s").is_ok());
4124        assert!(validate_duration("1h").is_ok());
4125        assert!(validate_duration("invalid").is_err());
4126        assert!(validate_duration("30").is_err()); // Missing unit
4127    }
4128
4129    #[test]
4130    fn test_validate_access_modes() {
4131        assert!(validate_access_modes(&["ReadWriteOnce".to_string()]).is_ok());
4132        assert!(
4133            validate_access_modes(&["ReadWriteOnce".to_string(), "ReadOnlyMany".to_string()])
4134                .is_ok()
4135        );
4136        assert!(validate_access_modes(&["Invalid".to_string()]).is_err());
4137    }
4138
4139    // RivvenConnect CRD tests
4140    #[test]
4141    fn test_connect_spec_defaults() {
4142        let spec = RivvenConnectSpec {
4143            cluster_ref: ClusterReference {
4144                name: "my-cluster".to_string(),
4145                namespace: None,
4146            },
4147            replicas: 1,
4148            version: "0.0.1".to_string(),
4149            image: None,
4150            image_pull_policy: "IfNotPresent".to_string(),
4151            image_pull_secrets: vec![],
4152            resources: None,
4153            config: ConnectConfigSpec::default(),
4154            sources: vec![],
4155            sinks: vec![],
4156            settings: GlobalConnectSettings::default(),
4157            tls: ConnectTlsSpec::default(),
4158            pod_annotations: BTreeMap::new(),
4159            pod_labels: BTreeMap::new(),
4160            env: vec![],
4161            node_selector: BTreeMap::new(),
4162            tolerations: vec![],
4163            affinity: None,
4164            service_account: None,
4165            security_context: None,
4166            container_security_context: None,
4167        };
4168        assert_eq!(spec.replicas, 1);
4169    }
4170
4171    #[test]
4172    fn test_connect_phase_default() {
4173        let phase = ConnectPhase::default();
4174        assert_eq!(phase, ConnectPhase::Pending);
4175    }
4176
4177    #[test]
4178    fn test_validate_connector_type() {
4179        assert!(validate_connector_type("postgres-cdc").is_ok());
4180        assert!(validate_connector_type("mysql-cdc").is_ok());
4181        assert!(validate_connector_type("http").is_ok());
4182        assert!(validate_connector_type("stdout").is_ok());
4183        assert!(validate_connector_type("s3").is_ok());
4184        assert!(validate_connector_type("datagen").is_ok());
4185        assert!(validate_connector_type("custom-connector").is_ok());
4186    }
4187
4188    #[test]
4189    fn test_validate_start_offset() {
4190        assert!(validate_start_offset("earliest").is_ok());
4191        assert!(validate_start_offset("latest").is_ok());
4192        assert!(validate_start_offset("2024-01-01T00:00:00Z").is_ok());
4193        assert!(validate_start_offset("2024-06-15T12:30:00+02:00").is_ok());
4194        assert!(validate_start_offset("invalid").is_err());
4195        // These inputs are correctly rejected:
4196        assert!(validate_start_offset("not-a-dateT:").is_err());
4197        assert!(validate_start_offset("xTy:z").is_err());
4198        assert!(validate_start_offset("2024-13-01T00:00:00Z").is_err()); // invalid month
4199        assert!(validate_start_offset("2024-01-32T00:00:00Z").is_err()); // invalid day
4200    }
4201
4202    #[test]
4203    fn test_validate_image_valid() {
4204        assert!(validate_image("nginx").is_ok());
4205        assert!(validate_image("nginx:latest").is_ok());
4206        assert!(validate_image("ghcr.io/hupe1980/rivven:0.0.1").is_ok());
4207        assert!(validate_image("my-registry.io:5000/image:tag").is_ok());
4208        assert!(validate_image("localhost:5000/myimage").is_ok());
4209        assert!(validate_image("").is_ok()); // Empty allowed, uses default
4210    }
4211
4212    #[test]
4213    fn test_validate_image_invalid() {
4214        assert!(validate_image("/absolute/path").is_err()); // Starts with /
4215        assert!(validate_image("-invalid").is_err()); // Starts with -
4216        assert!(validate_image("image..path").is_err()); // Contains ..
4217                                                         // Very long image name
4218        let long_name = "a".repeat(300);
4219        assert!(validate_image(&long_name).is_err());
4220    }
4221
4222    #[test]
4223    fn test_validate_node_selector() {
4224        let mut selectors = BTreeMap::new();
4225        selectors.insert("node-type".to_string(), "compute".to_string());
4226        assert!(validate_node_selector(&selectors).is_ok());
4227
4228        // Too many selectors
4229        let mut many = BTreeMap::new();
4230        for i in 0..25 {
4231            many.insert(format!("key-{}", i), "value".to_string());
4232        }
4233        assert!(validate_node_selector(&many).is_err());
4234    }
4235
4236    #[test]
4237    fn test_validate_annotations() {
4238        let mut annotations = BTreeMap::new();
4239        annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
4240        assert!(validate_annotations(&annotations).is_ok());
4241
4242        // Too many annotations
4243        let mut many = BTreeMap::new();
4244        for i in 0..55 {
4245            many.insert(format!("annotation-{}", i), "value".to_string());
4246        }
4247        assert!(validate_annotations(&many).is_err());
4248    }
4249
4250    #[test]
4251    fn test_validate_labels() {
4252        let mut labels = BTreeMap::new();
4253        labels.insert("team".to_string(), "platform".to_string());
4254        assert!(validate_labels(&labels).is_ok());
4255
4256        // Reserved prefix
4257        let mut reserved = BTreeMap::new();
4258        reserved.insert("app.kubernetes.io/custom".to_string(), "value".to_string());
4259        assert!(validate_labels(&reserved).is_err());
4260    }
4261
4262    #[test]
4263    fn test_validate_raw_config() {
4264        let mut config = BTreeMap::new();
4265        config.insert("custom.setting".to_string(), "value".to_string());
4266        assert!(validate_raw_config(&config).is_ok());
4267
4268        // Forbidden key
4269        let mut forbidden = BTreeMap::new();
4270        forbidden.insert("command".to_string(), "/bin/sh".to_string());
4271        assert!(validate_raw_config(&forbidden).is_err());
4272
4273        // Too many entries
4274        let mut many = BTreeMap::new();
4275        for i in 0..55 {
4276            many.insert(format!("config-{}", i), "value".to_string());
4277        }
4278        assert!(validate_raw_config(&many).is_err());
4279    }
4280
4281    #[test]
4282    fn test_validate_int_or_percent() {
4283        assert!(validate_optional_int_or_percent("1").is_ok());
4284        assert!(validate_optional_int_or_percent("25%").is_ok());
4285        assert!(validate_optional_int_or_percent("100%").is_ok());
4286        assert!(validate_optional_int_or_percent("").is_ok()); // Empty allowed
4287        assert!(validate_optional_int_or_percent("abc").is_err());
4288        assert!(validate_optional_int_or_percent("25%%").is_err());
4289    }
4290
4291    #[test]
4292    fn test_tls_spec_default() {
4293        let tls = TlsSpec::default();
4294        assert!(!tls.enabled);
4295        assert!(tls.cert_secret_name.is_none());
4296        assert!(!tls.mtls_enabled);
4297    }
4298
4299    #[test]
4300    fn test_metrics_spec_default() {
4301        let metrics = MetricsSpec::default();
4302        assert!(metrics.enabled);
4303        assert_eq!(metrics.port, 9090);
4304    }
4305
4306    #[test]
4307    fn test_pdb_spec_default() {
4308        let pdb = PdbSpec::default();
4309        assert!(pdb.enabled);
4310        assert!(pdb.min_available.is_none());
4311        assert_eq!(pdb.max_unavailable, Some("1".to_string()));
4312    }
4313
4314    #[test]
4315    fn test_service_monitor_labels() {
4316        let mut labels = BTreeMap::new();
4317        labels.insert("release".to_string(), "prometheus".to_string());
4318        assert!(validate_service_monitor_labels(&labels).is_ok());
4319
4320        // Too many labels
4321        let mut many = BTreeMap::new();
4322        for i in 0..15 {
4323            many.insert(format!("label-{}", i), "value".to_string());
4324        }
4325        assert!(validate_service_monitor_labels(&many).is_err());
4326    }
4327
4328    #[test]
4329    fn test_cluster_condition_time_format() {
4330        let condition = ClusterCondition {
4331            condition_type: "Ready".to_string(),
4332            status: "True".to_string(),
4333            last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4334            reason: Some("AllReplicasReady".to_string()),
4335            message: Some("All replicas are ready".to_string()),
4336        };
4337        assert!(condition.last_transition_time.unwrap().contains('T'));
4338    }
4339
4340    // ========================================================================
4341    // RivvenSchemaRegistry CRD Tests
4342    // ========================================================================
4343
4344    #[test]
4345    fn test_schema_registry_phase_default() {
4346        let phase = SchemaRegistryPhase::default();
4347        assert_eq!(phase, SchemaRegistryPhase::Pending);
4348    }
4349
4350    #[test]
4351    fn test_schema_registry_server_spec_default() {
4352        let spec = SchemaRegistryServerSpec::default();
4353        assert_eq!(spec.port, 8081);
4354        assert_eq!(spec.bind_address, "0.0.0.0");
4355        assert_eq!(spec.timeout_seconds, 30);
4356        assert_eq!(spec.max_request_size, 10_485_760);
4357        assert!(!spec.cors_enabled);
4358    }
4359
4360    #[test]
4361    fn test_schema_registry_storage_spec_default() {
4362        let spec = SchemaRegistryStorageSpec::default();
4363        assert_eq!(spec.mode, "broker");
4364        assert_eq!(spec.topic, "_schemas");
4365        assert_eq!(spec.replication_factor, 3);
4366        assert_eq!(spec.partitions, 1);
4367        assert!(spec.normalize);
4368    }
4369
4370    #[test]
4371    fn test_validate_storage_mode() {
4372        assert!(validate_storage_mode("memory").is_ok());
4373        assert!(validate_storage_mode("broker").is_ok());
4374        assert!(validate_storage_mode("invalid").is_err());
4375    }
4376
4377    #[test]
4378    fn test_schema_compatibility_spec_default() {
4379        let spec = SchemaCompatibilitySpec::default();
4380        assert_eq!(spec.default_level, "BACKWARD");
4381        assert!(spec.allow_overrides);
4382        assert!(spec.subjects.is_empty());
4383    }
4384
4385    #[test]
4386    fn test_validate_compatibility_level() {
4387        assert!(validate_compatibility_level("BACKWARD").is_ok());
4388        assert!(validate_compatibility_level("BACKWARD_TRANSITIVE").is_ok());
4389        assert!(validate_compatibility_level("FORWARD").is_ok());
4390        assert!(validate_compatibility_level("FORWARD_TRANSITIVE").is_ok());
4391        assert!(validate_compatibility_level("FULL").is_ok());
4392        assert!(validate_compatibility_level("FULL_TRANSITIVE").is_ok());
4393        assert!(validate_compatibility_level("NONE").is_ok());
4394        assert!(validate_compatibility_level("invalid").is_err());
4395        assert!(validate_compatibility_level("backward").is_err()); // Case sensitive
4396    }
4397
4398    #[test]
4399    fn test_schema_format_spec_default() {
4400        let spec = SchemaFormatSpec::default();
4401        assert!(spec.avro);
4402        assert!(spec.json_schema);
4403        assert!(spec.protobuf);
4404        assert!(spec.strict_validation);
4405    }
4406
4407    #[test]
4408    fn test_schema_contexts_spec_default() {
4409        let spec = SchemaContextsSpec::default();
4410        assert!(!spec.enabled);
4411        assert_eq!(spec.max_contexts, 0);
4412        assert!(spec.predefined.is_empty());
4413    }
4414
4415    #[test]
4416    fn test_schema_validation_spec_default() {
4417        let spec = SchemaValidationSpec::default();
4418        assert!(!spec.enabled);
4419        assert_eq!(spec.max_schema_size, 1_048_576);
4420        assert!(spec.rules.is_empty());
4421    }
4422
4423    #[test]
4424    fn test_validate_rule_type() {
4425        assert!(validate_rule_type("regex").is_ok());
4426        assert!(validate_rule_type("field_exists").is_ok());
4427        assert!(validate_rule_type("field_type").is_ok());
4428        assert!(validate_rule_type("naming_convention").is_ok());
4429        assert!(validate_rule_type("documentation").is_ok());
4430        assert!(validate_rule_type("invalid").is_err());
4431    }
4432
4433    #[test]
4434    fn test_validate_validation_level() {
4435        assert!(validate_validation_level("error").is_ok());
4436        assert!(validate_validation_level("warning").is_ok());
4437        assert!(validate_validation_level("info").is_ok());
4438        assert!(validate_validation_level("invalid").is_err());
4439    }
4440
4441    #[test]
4442    fn test_schema_registry_auth_spec_default() {
4443        let spec = SchemaRegistryAuthSpec::default();
4444        assert!(!spec.enabled);
4445        assert!(spec.method.is_none());
4446        assert!(spec.credentials_secret_ref.is_none());
4447    }
4448
4449    #[test]
4450    fn test_validate_auth_method() {
4451        assert!(validate_auth_method("basic").is_ok());
4452        assert!(validate_auth_method("jwt").is_ok());
4453        assert!(validate_auth_method("cedar").is_ok());
4454        assert!(validate_auth_method("").is_ok());
4455        assert!(validate_auth_method("invalid").is_err());
4456    }
4457
4458    #[test]
4459    fn test_jwt_auth_spec_default() {
4460        let spec = JwtAuthSpec::default();
4461        assert!(spec.issuer_url.is_none());
4462        assert!(spec.jwks_url.is_none());
4463        // Note: Default::default() gives empty strings; serde defaults apply during deserialization
4464        assert!(spec.username_claim.is_empty());
4465        assert!(spec.roles_claim.is_empty());
4466    }
4467
4468    #[test]
4469    fn test_validate_user_role() {
4470        assert!(validate_user_role("admin").is_ok());
4471        assert!(validate_user_role("writer").is_ok());
4472        assert!(validate_user_role("reader").is_ok());
4473        assert!(validate_user_role("invalid").is_err());
4474    }
4475
4476    #[test]
4477    fn test_schema_registry_tls_spec_default() {
4478        let spec = SchemaRegistryTlsSpec::default();
4479        assert!(!spec.enabled);
4480        assert!(spec.cert_secret_name.is_none());
4481        assert!(!spec.mtls_enabled);
4482        assert!(!spec.broker_tls_enabled);
4483    }
4484
4485    #[test]
4486    fn test_schema_registry_metrics_spec_default() {
4487        let spec = SchemaRegistryMetricsSpec::default();
4488        assert!(spec.enabled);
4489        assert_eq!(spec.port, 9090);
4490        assert_eq!(spec.path, "/metrics");
4491        assert!(!spec.service_monitor_enabled);
4492        assert_eq!(spec.scrape_interval, "30s");
4493    }
4494
4495    #[test]
4496    fn test_external_registry_spec_default() {
4497        let spec = ExternalRegistrySpec::default();
4498        assert!(!spec.enabled);
4499        assert!(spec.registry_type.is_none());
4500        assert!(spec.confluent_url.is_none());
4501        assert!(spec.glue_registry_arn.is_none());
4502        // Note: Default::default() gives 0; serde default (300) applies during deserialization
4503        assert_eq!(spec.sync_interval_seconds, 0);
4504    }
4505
4506    #[test]
4507    fn test_validate_external_registry_type() {
4508        assert!(validate_external_registry_type("confluent").is_ok());
4509        assert!(validate_external_registry_type("glue").is_ok());
4510        assert!(validate_external_registry_type("").is_ok());
4511        assert!(validate_external_registry_type("invalid").is_err());
4512    }
4513
4514    #[test]
4515    fn test_validate_sync_mode() {
4516        assert!(validate_sync_mode("mirror").is_ok());
4517        assert!(validate_sync_mode("push").is_ok());
4518        assert!(validate_sync_mode("bidirectional").is_ok());
4519        assert!(validate_sync_mode("").is_ok());
4520        assert!(validate_sync_mode("invalid").is_err());
4521    }
4522
4523    #[test]
4524    fn test_schema_registry_spec_get_image_default() {
4525        let spec = RivvenSchemaRegistrySpec {
4526            cluster_ref: ClusterReference {
4527                name: "test".to_string(),
4528                namespace: None,
4529            },
4530            replicas: 1,
4531            version: "0.0.1".to_string(),
4532            image: None,
4533            image_pull_policy: "IfNotPresent".to_string(),
4534            image_pull_secrets: vec![],
4535            resources: None,
4536            server: SchemaRegistryServerSpec::default(),
4537            storage: SchemaRegistryStorageSpec::default(),
4538            compatibility: SchemaCompatibilitySpec::default(),
4539            schemas: SchemaFormatSpec::default(),
4540            contexts: SchemaContextsSpec::default(),
4541            validation: SchemaValidationSpec::default(),
4542            auth: SchemaRegistryAuthSpec::default(),
4543            tls: SchemaRegistryTlsSpec::default(),
4544            metrics: SchemaRegistryMetricsSpec::default(),
4545            external: ExternalRegistrySpec::default(),
4546            pod_annotations: BTreeMap::new(),
4547            pod_labels: BTreeMap::new(),
4548            env: vec![],
4549            node_selector: BTreeMap::new(),
4550            tolerations: vec![],
4551            affinity: None,
4552            service_account: None,
4553            security_context: None,
4554            container_security_context: None,
4555            liveness_probe: ProbeSpec::default(),
4556            readiness_probe: ProbeSpec::default(),
4557            pod_disruption_budget: PdbSpec::default(),
4558        };
4559        assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven-schema:0.0.1");
4560    }
4561
4562    #[test]
4563    fn test_schema_registry_spec_get_image_custom() {
4564        let spec = RivvenSchemaRegistrySpec {
4565            cluster_ref: ClusterReference {
4566                name: "test".to_string(),
4567                namespace: None,
4568            },
4569            replicas: 1,
4570            version: "0.0.1".to_string(),
4571            image: Some("custom/schema-registry:latest".to_string()),
4572            image_pull_policy: "IfNotPresent".to_string(),
4573            image_pull_secrets: vec![],
4574            resources: None,
4575            server: SchemaRegistryServerSpec::default(),
4576            storage: SchemaRegistryStorageSpec::default(),
4577            compatibility: SchemaCompatibilitySpec::default(),
4578            schemas: SchemaFormatSpec::default(),
4579            contexts: SchemaContextsSpec::default(),
4580            validation: SchemaValidationSpec::default(),
4581            auth: SchemaRegistryAuthSpec::default(),
4582            tls: SchemaRegistryTlsSpec::default(),
4583            metrics: SchemaRegistryMetricsSpec::default(),
4584            external: ExternalRegistrySpec::default(),
4585            pod_annotations: BTreeMap::new(),
4586            pod_labels: BTreeMap::new(),
4587            env: vec![],
4588            node_selector: BTreeMap::new(),
4589            tolerations: vec![],
4590            affinity: None,
4591            service_account: None,
4592            security_context: None,
4593            container_security_context: None,
4594            liveness_probe: ProbeSpec::default(),
4595            readiness_probe: ProbeSpec::default(),
4596            pod_disruption_budget: PdbSpec::default(),
4597        };
4598        assert_eq!(spec.get_image(), "custom/schema-registry:latest");
4599    }
4600
4601    #[test]
4602    fn test_schema_registry_spec_get_labels() {
4603        let spec = RivvenSchemaRegistrySpec {
4604            cluster_ref: ClusterReference {
4605                name: "test".to_string(),
4606                namespace: None,
4607            },
4608            replicas: 1,
4609            version: "0.0.1".to_string(),
4610            image: None,
4611            image_pull_policy: "IfNotPresent".to_string(),
4612            image_pull_secrets: vec![],
4613            resources: None,
4614            server: SchemaRegistryServerSpec::default(),
4615            storage: SchemaRegistryStorageSpec::default(),
4616            compatibility: SchemaCompatibilitySpec::default(),
4617            schemas: SchemaFormatSpec::default(),
4618            contexts: SchemaContextsSpec::default(),
4619            validation: SchemaValidationSpec::default(),
4620            auth: SchemaRegistryAuthSpec::default(),
4621            tls: SchemaRegistryTlsSpec::default(),
4622            metrics: SchemaRegistryMetricsSpec::default(),
4623            external: ExternalRegistrySpec::default(),
4624            pod_annotations: BTreeMap::new(),
4625            pod_labels: BTreeMap::new(),
4626            env: vec![],
4627            node_selector: BTreeMap::new(),
4628            tolerations: vec![],
4629            affinity: None,
4630            service_account: None,
4631            security_context: None,
4632            container_security_context: None,
4633            liveness_probe: ProbeSpec::default(),
4634            readiness_probe: ProbeSpec::default(),
4635            pod_disruption_budget: PdbSpec::default(),
4636        };
4637
4638        let labels = spec.get_labels("my-registry");
4639        assert_eq!(
4640            labels.get("app.kubernetes.io/name"),
4641            Some(&"rivven-schema-registry".to_string())
4642        );
4643        assert_eq!(
4644            labels.get("app.kubernetes.io/instance"),
4645            Some(&"my-registry".to_string())
4646        );
4647        assert_eq!(
4648            labels.get("app.kubernetes.io/component"),
4649            Some(&"schema-registry".to_string())
4650        );
4651    }
4652
4653    #[test]
4654    fn test_schema_registry_condition_time_format() {
4655        let condition = SchemaRegistryCondition {
4656            condition_type: "Ready".to_string(),
4657            status: "True".to_string(),
4658            last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4659            reason: Some("AllReplicasReady".to_string()),
4660            message: Some("All replicas are ready".to_string()),
4661        };
4662        assert!(condition.last_transition_time.unwrap().contains('T'));
4663    }
4664
4665    #[test]
4666    fn test_validate_subject_compatibility_map() {
4667        let mut subjects = BTreeMap::new();
4668        subjects.insert("orders-value".to_string(), "BACKWARD".to_string());
4669        subjects.insert("users-value".to_string(), "FULL".to_string());
4670        assert!(validate_subject_compatibility_map(&subjects).is_ok());
4671
4672        // Invalid compatibility level
4673        let mut invalid = BTreeMap::new();
4674        invalid.insert("test".to_string(), "invalid".to_string());
4675        assert!(validate_subject_compatibility_map(&invalid).is_err());
4676
4677        // Subject name too long
4678        let mut long_name = BTreeMap::new();
4679        long_name.insert("a".repeat(300), "BACKWARD".to_string());
4680        assert!(validate_subject_compatibility_map(&long_name).is_err());
4681    }
4682
4683    // ========================================================================
4684    // Advanced CDC Configuration Tests
4685    // ========================================================================
4686
4687    #[test]
4688    fn test_snapshot_cdc_config_spec_serde_defaults() {
4689        // Test that serde deserialization applies default functions
4690        let json = r#"{}"#;
4691        let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4692        assert_eq!(config.batch_size, 10_000);
4693        assert_eq!(config.parallel_tables, 4);
4694        assert_eq!(config.query_timeout_secs, 300);
4695        assert_eq!(config.throttle_delay_ms, 0);
4696        assert_eq!(config.max_retries, 3);
4697        assert!(config.include_tables.is_empty());
4698        assert!(config.exclude_tables.is_empty());
4699    }
4700
4701    #[test]
4702    fn test_snapshot_cdc_config_spec_validation() {
4703        let json = r#"{"batchSize": 50}"#;
4704        let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4705        assert!(config.validate().is_err());
4706
4707        let json = r#"{"batchSize": 5000, "parallelTables": 50}"#;
4708        let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4709        assert!(config.validate().is_err());
4710    }
4711
4712    #[test]
4713    fn test_incremental_snapshot_spec_serde_defaults() {
4714        let json = r#"{}"#;
4715        let config: IncrementalSnapshotSpec = serde_json::from_str(json).unwrap();
4716        assert!(!config.enabled);
4717        assert_eq!(config.chunk_size, 1024);
4718        assert!(config.watermark_strategy.is_empty());
4719        assert_eq!(config.max_concurrent_chunks, 1);
4720    }
4721
4722    #[test]
4723    fn test_validate_watermark_strategy() {
4724        assert!(validate_watermark_strategy("insert").is_ok());
4725        assert!(validate_watermark_strategy("update_and_insert").is_ok());
4726        assert!(validate_watermark_strategy("").is_ok());
4727        assert!(validate_watermark_strategy("invalid").is_err());
4728    }
4729
4730    #[test]
4731    fn test_heartbeat_cdc_spec_serde_defaults() {
4732        let json = r#"{}"#;
4733        let config: HeartbeatCdcSpec = serde_json::from_str(json).unwrap();
4734        assert!(!config.enabled);
4735        assert_eq!(config.interval_secs, 10);
4736        assert_eq!(config.max_lag_secs, 300);
4737        assert!(!config.emit_events);
4738    }
4739
4740    #[test]
4741    fn test_deduplication_cdc_spec_serde_defaults() {
4742        let json = r#"{}"#;
4743        let config: DeduplicationCdcSpec = serde_json::from_str(json).unwrap();
4744        assert!(!config.enabled);
4745        assert_eq!(config.bloom_expected_insertions, 100_000);
4746        assert_eq!(config.bloom_fpp, 0.01);
4747        assert_eq!(config.lru_size, 10_000);
4748        assert_eq!(config.window_secs, 3600);
4749    }
4750
4751    #[test]
4752    fn test_transaction_topic_spec_serde_defaults() {
4753        let json = r#"{}"#;
4754        let config: TransactionTopicSpec = serde_json::from_str(json).unwrap();
4755        assert!(!config.enabled);
4756        assert!(config.topic_name.is_none());
4757        assert!(config.include_data_collections);
4758        assert_eq!(config.min_events_threshold, 0);
4759    }
4760
4761    #[test]
4762    fn test_schema_change_topic_spec_serde_defaults() {
4763        let json = r#"{}"#;
4764        let config: SchemaChangeTopicSpec = serde_json::from_str(json).unwrap();
4765        assert!(!config.enabled);
4766        assert!(config.topic_name.is_none());
4767        assert!(config.include_columns);
4768        assert!(config.schemas.is_empty());
4769    }
4770
4771    #[test]
4772    fn test_tombstone_cdc_spec_serde_defaults() {
4773        let json = r#"{}"#;
4774        let config: TombstoneCdcSpec = serde_json::from_str(json).unwrap();
4775        assert!(!config.enabled);
4776        assert!(config.after_delete);
4777        assert!(config.behavior.is_empty());
4778    }
4779
4780    #[test]
4781    fn test_validate_tombstone_behavior() {
4782        assert!(validate_tombstone_behavior("emit_null").is_ok());
4783        assert!(validate_tombstone_behavior("emit_with_key").is_ok());
4784        assert!(validate_tombstone_behavior("").is_ok());
4785        assert!(validate_tombstone_behavior("invalid").is_err());
4786    }
4787
4788    #[test]
4789    fn test_field_encryption_spec_serde_defaults() {
4790        let json = r#"{}"#;
4791        let config: FieldEncryptionSpec = serde_json::from_str(json).unwrap();
4792        assert!(!config.enabled);
4793        assert!(config.key_secret_ref.is_none());
4794        assert!(config.fields.is_empty());
4795        assert_eq!(config.algorithm, "aes-256-gcm");
4796    }
4797
4798    #[test]
4799    fn test_read_only_replica_spec_serde_defaults() {
4800        let json = r#"{}"#;
4801        let config: ReadOnlyReplicaSpec = serde_json::from_str(json).unwrap();
4802        assert!(!config.enabled);
4803        assert_eq!(config.lag_threshold_ms, 5000);
4804        assert!(config.deduplicate);
4805        assert!(config.watermark_source.is_empty());
4806    }
4807
4808    #[test]
4809    fn test_validate_watermark_source() {
4810        assert!(validate_watermark_source("primary").is_ok());
4811        assert!(validate_watermark_source("replica").is_ok());
4812        assert!(validate_watermark_source("").is_ok());
4813        assert!(validate_watermark_source("invalid").is_err());
4814    }
4815
4816    #[test]
4817    fn test_event_router_spec_serde_defaults() {
4818        let json = r#"{}"#;
4819        let config: EventRouterSpec = serde_json::from_str(json).unwrap();
4820        assert!(!config.enabled);
4821        assert!(config.default_destination.is_none());
4822        assert!(config.dead_letter_queue.is_none());
4823        assert!(!config.drop_unroutable);
4824        assert!(config.rules.is_empty());
4825    }
4826
4827    #[test]
4828    fn test_validate_route_condition_type() {
4829        assert!(validate_route_condition_type("always").is_ok());
4830        assert!(validate_route_condition_type("table").is_ok());
4831        assert!(validate_route_condition_type("table_pattern").is_ok());
4832        assert!(validate_route_condition_type("schema").is_ok());
4833        assert!(validate_route_condition_type("operation").is_ok());
4834        assert!(validate_route_condition_type("field_equals").is_ok());
4835        assert!(validate_route_condition_type("field_exists").is_ok());
4836        assert!(validate_route_condition_type("invalid").is_err());
4837    }
4838
4839    #[test]
4840    fn test_partitioner_spec_serde_defaults() {
4841        let json = r#"{}"#;
4842        let config: PartitionerSpec = serde_json::from_str(json).unwrap();
4843        assert!(!config.enabled);
4844        assert_eq!(config.num_partitions, 16);
4845        assert_eq!(config.strategy, "key_hash");
4846        assert!(config.key_columns.is_empty());
4847    }
4848
4849    #[test]
4850    fn test_validate_partition_strategy() {
4851        assert!(validate_partition_strategy("round_robin").is_ok());
4852        assert!(validate_partition_strategy("key_hash").is_ok());
4853        assert!(validate_partition_strategy("table_hash").is_ok());
4854        assert!(validate_partition_strategy("full_table_hash").is_ok());
4855        assert!(validate_partition_strategy("sticky").is_ok());
4856        assert!(validate_partition_strategy("invalid").is_err());
4857    }
4858
4859    #[test]
4860    fn test_validate_smt_transform_type() {
4861        assert!(validate_smt_transform_type("extract_new_record_state").is_ok());
4862        assert!(validate_smt_transform_type("mask_field").is_ok());
4863        assert!(validate_smt_transform_type("filter").is_ok());
4864        assert!(validate_smt_transform_type("flatten").is_ok());
4865        assert!(validate_smt_transform_type("cast").is_ok());
4866        assert!(validate_smt_transform_type("regex_router").is_ok());
4867        assert!(validate_smt_transform_type("content_router").is_ok());
4868        assert!(validate_smt_transform_type("invalid").is_err());
4869    }
4870
4871    #[test]
4872    fn test_parallel_cdc_spec_serde_defaults() {
4873        let json = r#"{}"#;
4874        let config: ParallelCdcSpec = serde_json::from_str(json).unwrap();
4875        assert!(!config.enabled);
4876        assert_eq!(config.concurrency, 4);
4877        assert_eq!(config.per_table_buffer, 1000);
4878        assert_eq!(config.output_buffer, 10_000);
4879        assert!(config.work_stealing);
4880        assert!(config.per_table_rate_limit.is_none());
4881        assert_eq!(config.shutdown_timeout_secs, 30);
4882    }
4883
4884    #[test]
4885    fn test_outbox_spec_serde_defaults() {
4886        let json = r#"{}"#;
4887        let config: OutboxSpec = serde_json::from_str(json).unwrap();
4888        assert!(!config.enabled);
4889        assert_eq!(config.table_name, "outbox");
4890        assert_eq!(config.poll_interval_ms, 100);
4891        assert_eq!(config.batch_size, 100);
4892        assert_eq!(config.max_retries, 3);
4893        assert_eq!(config.delivery_timeout_secs, 30);
4894        assert!(config.ordered_delivery);
4895        assert_eq!(config.retention_secs, 86400);
4896        assert_eq!(config.max_concurrency, 10);
4897    }
4898
4899    #[test]
4900    fn test_health_monitor_spec_serde_defaults() {
4901        let json = r#"{}"#;
4902        let config: HealthMonitorSpec = serde_json::from_str(json).unwrap();
4903        assert!(!config.enabled);
4904        assert_eq!(config.check_interval_secs, 10);
4905        assert_eq!(config.max_lag_ms, 30_000);
4906        assert_eq!(config.failure_threshold, 3);
4907        assert_eq!(config.success_threshold, 2);
4908        assert_eq!(config.check_timeout_secs, 5);
4909        assert!(config.auto_recovery);
4910        assert_eq!(config.recovery_delay_secs, 1);
4911        assert_eq!(config.max_recovery_delay_secs, 60);
4912    }
4913
4914    #[test]
4915    fn test_signal_table_spec_serde_defaults() {
4916        let json = r#"{}"#;
4917        let config: SignalTableSpec = serde_json::from_str(json).unwrap();
4918        assert!(!config.enabled);
4919        assert!(config.data_collection.is_none());
4920        assert!(config.topic.is_none());
4921        assert!(config.enabled_channels.is_empty());
4922        assert_eq!(config.poll_interval_ms, 1000);
4923    }
4924}