Skip to main content

rivven_operator/
crd.rs

1//! Custom Resource Definitions for Rivven Kubernetes Operator
2//!
3//! This module defines the `RivvenCluster` CRD that represents a Rivven
4//! distributed streaming cluster in Kubernetes.
5
6use k8s_openapi::api::core::v1::ResourceRequirements;
7use kube::CustomResource;
8use once_cell::sync::Lazy;
9use regex::Regex;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12use std::collections::BTreeMap;
13use validator::{Validate, ValidationError};
14
15/// Regex for validating Kubernetes resource quantities (e.g., "10Gi", "100Mi")
16static QUANTITY_REGEX: Lazy<Regex> =
17    Lazy::new(|| Regex::new(r"^[0-9]+(\.[0-9]+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$").unwrap());
18
19/// Regex for validating Kubernetes names (RFC 1123 subdomain)
20static NAME_REGEX: Lazy<Regex> =
21    Lazy::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
22
23/// Validate a Kubernetes resource quantity string
24fn validate_quantity(value: &str) -> Result<(), ValidationError> {
25    if QUANTITY_REGEX.is_match(value) {
26        Ok(())
27    } else {
28        Err(ValidationError::new("invalid_quantity")
29            .with_message(format!("'{}' is not a valid Kubernetes quantity", value).into()))
30    }
31}
32
33/// Validate a container image reference
34fn validate_image(value: &str) -> Result<(), ValidationError> {
35    if value.is_empty() {
36        return Ok(()); // Empty is allowed (uses default)
37    }
38    if value.len() > 255 {
39        return Err(ValidationError::new("image_too_long")
40            .with_message("image reference exceeds 255 characters".into()));
41    }
42    // Basic format check - not overly strict to allow various registries
43    if value.contains("..") || value.starts_with('/') || value.starts_with('-') {
44        return Err(ValidationError::new("invalid_image")
45            .with_message(format!("'{}' is not a valid container image", value).into()));
46    }
47    Ok(())
48}
49
50/// Validate a Kubernetes name (RFC 1123 subdomain)
51fn validate_k8s_name(value: &str) -> Result<(), ValidationError> {
52    if value.is_empty() {
53        return Ok(()); // Empty is allowed for optional fields
54    }
55    if value.len() > 63 {
56        return Err(
57            ValidationError::new("name_too_long").with_message("name exceeds 63 characters".into())
58        );
59    }
60    if !NAME_REGEX.is_match(value) {
61        return Err(ValidationError::new("invalid_name").with_message(
62            format!("'{}' is not a valid Kubernetes name (RFC 1123)", value).into(),
63        ));
64    }
65    Ok(())
66}
67
68/// Validate environment variable name (POSIX)
69fn validate_env_vars(vars: &[k8s_openapi::api::core::v1::EnvVar]) -> Result<(), ValidationError> {
70    // Limit number of env vars to prevent resource exhaustion
71    const MAX_ENV_VARS: usize = 100;
72    if vars.len() > MAX_ENV_VARS {
73        return Err(ValidationError::new("too_many_env_vars").with_message(
74            format!("maximum {} environment variables allowed", MAX_ENV_VARS).into(),
75        ));
76    }
77    for var in vars {
78        // Validate env var name format
79        if var.name.is_empty() || var.name.len() > 256 {
80            return Err(ValidationError::new("invalid_env_name")
81                .with_message("environment variable name must be 1-256 characters".into()));
82        }
83        // Check for dangerous env var names that could override security settings
84        let forbidden_prefixes = ["LD_", "DYLD_", "PATH=", "HOME=", "USER="];
85        for prefix in forbidden_prefixes {
86            if var.name.starts_with(prefix) && var.value.is_some() {
87                return Err(ValidationError::new("forbidden_env_var").with_message(
88                    format!(
89                        "environment variable '{}' is not allowed for security",
90                        var.name
91                    )
92                    .into(),
93                ));
94            }
95        }
96    }
97    Ok(())
98}
99
100/// RivvenCluster custom resource definition
101///
102/// Represents a Rivven distributed event streaming cluster deployment.
103/// The operator watches these resources and reconciles the actual cluster
104/// state to match the desired specification.
105#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
106#[kube(
107    group = "rivven.io",
108    version = "v1alpha1",
109    kind = "RivvenCluster",
110    plural = "rivvenclusters",
111    shortname = "rc",
112    namespaced,
113    status = "RivvenClusterStatus",
114    printcolumn = r#"{"name":"Replicas", "type":"integer", "jsonPath":".spec.replicas"}"#,
115    printcolumn = r#"{"name":"Ready", "type":"integer", "jsonPath":".status.readyReplicas"}"#,
116    printcolumn = r#"{"name":"Phase", "type":"string", "jsonPath":".status.phase"}"#,
117    printcolumn = r#"{"name":"Age", "type":"date", "jsonPath":".metadata.creationTimestamp"}"#
118)]
119#[serde(rename_all = "camelCase")]
120pub struct RivvenClusterSpec {
121    /// Number of broker replicas (1-100)
122    #[serde(default = "default_replicas")]
123    #[validate(range(min = 1, max = 100, message = "replicas must be between 1 and 100"))]
124    pub replicas: i32,
125
126    /// Rivven version to deploy (must match semver pattern)
127    #[serde(default = "default_version")]
128    #[validate(length(min = 1, max = 64, message = "version must be 1-64 characters"))]
129    pub version: String,
130
131    /// Container image (overrides version-based default)
132    /// Must be a valid container image reference
133    #[serde(default)]
134    #[validate(custom(function = "validate_optional_image"))]
135    pub image: Option<String>,
136
137    /// Image pull policy (Always, IfNotPresent, Never)
138    #[serde(default = "default_image_pull_policy")]
139    #[validate(custom(function = "validate_pull_policy"))]
140    pub image_pull_policy: String,
141
142    /// Image pull secrets (max 10 secrets)
143    #[serde(default)]
144    #[validate(length(max = 10, message = "maximum 10 image pull secrets allowed"))]
145    pub image_pull_secrets: Vec<String>,
146
147    /// Storage configuration
148    #[serde(default)]
149    #[validate(nested)]
150    pub storage: StorageSpec,
151
152    /// Resource requirements (CPU, memory)
153    #[serde(default)]
154    #[schemars(skip)]
155    pub resources: Option<ResourceRequirements>,
156
157    /// Broker configuration parameters
158    #[serde(default)]
159    #[validate(nested)]
160    pub config: BrokerConfig,
161
162    /// TLS configuration
163    #[serde(default)]
164    #[validate(nested)]
165    pub tls: TlsSpec,
166
167    /// Metrics configuration
168    #[serde(default)]
169    #[validate(nested)]
170    pub metrics: MetricsSpec,
171
172    /// Pod affinity/anti-affinity rules
173    #[serde(default)]
174    #[schemars(skip)]
175    pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
176
177    /// Node selector for pod scheduling (max 20 selectors)
178    #[serde(default)]
179    #[validate(custom(function = "validate_node_selector"))]
180    pub node_selector: BTreeMap<String, String>,
181
182    /// Tolerations for pod scheduling
183    #[serde(default)]
184    #[schemars(skip)]
185    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
186
187    /// Pod disruption budget configuration
188    #[serde(default)]
189    #[validate(nested)]
190    pub pod_disruption_budget: PdbSpec,
191
192    /// Service account name (must be valid K8s name)
193    #[serde(default)]
194    #[validate(custom(function = "validate_optional_k8s_name"))]
195    pub service_account: Option<String>,
196
197    /// Additional pod annotations (max 50)
198    #[serde(default)]
199    #[validate(custom(function = "validate_annotations"))]
200    pub pod_annotations: BTreeMap<String, String>,
201
202    /// Additional pod labels (max 20)
203    #[serde(default)]
204    #[validate(custom(function = "validate_labels"))]
205    pub pod_labels: BTreeMap<String, String>,
206
207    /// Environment variables (validated for security)
208    #[serde(default)]
209    #[schemars(skip)]
210    #[validate(custom(function = "validate_env_vars"))]
211    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
212
213    /// Liveness probe configuration
214    #[serde(default)]
215    #[validate(nested)]
216    pub liveness_probe: ProbeSpec,
217
218    /// Readiness probe configuration
219    #[serde(default)]
220    #[validate(nested)]
221    pub readiness_probe: ProbeSpec,
222
223    /// Security context for pods
224    #[serde(default)]
225    #[schemars(skip)]
226    pub security_context: Option<k8s_openapi::api::core::v1::PodSecurityContext>,
227
228    /// Container security context
229    #[serde(default)]
230    #[schemars(skip)]
231    pub container_security_context: Option<k8s_openapi::api::core::v1::SecurityContext>,
232}
233
234/// Validate optional image reference
235fn validate_optional_image(image: &str) -> Result<(), ValidationError> {
236    validate_image(image)
237}
238
239/// Validate image pull policy
240fn validate_pull_policy(policy: &str) -> Result<(), ValidationError> {
241    match policy {
242        "Always" | "IfNotPresent" | "Never" => Ok(()),
243        _ => Err(ValidationError::new("invalid_pull_policy")
244            .with_message("imagePullPolicy must be Always, IfNotPresent, or Never".into())),
245    }
246}
247
248/// Validate node selector map
249fn validate_node_selector(selectors: &BTreeMap<String, String>) -> Result<(), ValidationError> {
250    if selectors.len() > 20 {
251        return Err(ValidationError::new("too_many_selectors")
252            .with_message("maximum 20 node selectors allowed".into()));
253    }
254    for (key, value) in selectors {
255        if key.len() > 253 || value.len() > 63 {
256            return Err(ValidationError::new("selector_too_long")
257                .with_message("selector key max 253 chars, value max 63 chars".into()));
258        }
259    }
260    Ok(())
261}
262
263/// Validate optional Kubernetes name (for use with Option<String> fields)
264fn validate_optional_k8s_name(name: &str) -> Result<(), ValidationError> {
265    if name.is_empty() {
266        return Ok(()); // Empty is allowed for optional fields
267    }
268    validate_k8s_name(name)
269}
270
271/// Validate annotations map
272fn validate_annotations(annotations: &BTreeMap<String, String>) -> Result<(), ValidationError> {
273    if annotations.len() > 50 {
274        return Err(ValidationError::new("too_many_annotations")
275            .with_message("maximum 50 annotations allowed".into()));
276    }
277    for (key, value) in annotations {
278        // Annotation keys can be up to 253 chars (with optional prefix)
279        if key.len() > 253 {
280            return Err(ValidationError::new("annotation_key_too_long")
281                .with_message(format!("annotation key '{}' exceeds 253 characters", key).into()));
282        }
283        // Annotation values can be up to 256KB
284        if value.len() > 262144 {
285            return Err(ValidationError::new("annotation_value_too_long")
286                .with_message(format!("annotation '{}' value exceeds 256KB", key).into()));
287        }
288    }
289    Ok(())
290}
291
292/// Validate labels map
293fn validate_labels(labels: &BTreeMap<String, String>) -> Result<(), ValidationError> {
294    if labels.len() > 20 {
295        return Err(ValidationError::new("too_many_labels")
296            .with_message("maximum 20 labels allowed".into()));
297    }
298    for (key, value) in labels {
299        if key.len() > 253 || value.len() > 63 {
300            return Err(ValidationError::new("label_too_long")
301                .with_message("label key max 253 chars, value max 63 chars".into()));
302        }
303        // Labels must not override managed labels
304        if key.starts_with("app.kubernetes.io/") {
305            return Err(ValidationError::new("reserved_label").with_message(
306                format!("label '{}' uses reserved prefix app.kubernetes.io/", key).into(),
307            ));
308        }
309    }
310    Ok(())
311}
312
313/// Storage specification for broker data
314#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
315#[serde(rename_all = "camelCase")]
316pub struct StorageSpec {
317    /// Storage size (e.g., "100Gi") - must be valid Kubernetes quantity
318    #[serde(default = "default_storage_size")]
319    #[validate(custom(function = "validate_quantity"))]
320    pub size: String,
321
322    /// Storage class name (empty uses default)
323    #[serde(default)]
324    #[validate(custom(function = "validate_optional_k8s_name"))]
325    pub storage_class_name: Option<String>,
326
327    /// Access modes for the PVC
328    #[serde(default = "default_access_modes")]
329    #[validate(length(min = 1, max = 3, message = "access modes must have 1-3 entries"))]
330    #[validate(custom(function = "validate_access_modes"))]
331    pub access_modes: Vec<String>,
332}
333
334/// Validate PVC access modes
335fn validate_access_modes(modes: &[String]) -> Result<(), ValidationError> {
336    let valid_modes = [
337        "ReadWriteOnce",
338        "ReadOnlyMany",
339        "ReadWriteMany",
340        "ReadWriteOncePod",
341    ];
342    for mode in modes {
343        if !valid_modes.contains(&mode.as_str()) {
344            return Err(ValidationError::new("invalid_access_mode")
345                .with_message(format!("'{}' is not a valid access mode", mode).into()));
346        }
347    }
348    Ok(())
349}
350
351impl Default for StorageSpec {
352    fn default() -> Self {
353        Self {
354            size: default_storage_size(),
355            storage_class_name: None,
356            access_modes: default_access_modes(),
357        }
358    }
359}
360
361/// Broker configuration parameters
362#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
363#[serde(rename_all = "camelCase")]
364pub struct BrokerConfig {
365    /// Default number of partitions for new topics (1-1000)
366    #[serde(default = "default_partitions")]
367    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
368    pub default_partitions: i32,
369
370    /// Default replication factor for new topics (1-10)
371    #[serde(default = "default_replication_factor")]
372    #[validate(range(
373        min = 1,
374        max = 10,
375        message = "replication factor must be between 1 and 10"
376    ))]
377    pub default_replication_factor: i32,
378
379    /// Log retention period in hours (1-8760, i.e., 1 hour to 1 year)
380    #[serde(default = "default_log_retention_hours")]
381    #[validate(range(
382        min = 1,
383        max = 8760,
384        message = "retention hours must be between 1 and 8760"
385    ))]
386    pub log_retention_hours: i32,
387
388    /// Log segment size in bytes (1MB to 10GB)
389    #[serde(default = "default_log_segment_bytes")]
390    #[validate(custom(function = "validate_segment_size"))]
391    pub log_segment_bytes: i64,
392
393    /// Maximum message size in bytes (1KB to 100MB)
394    #[serde(default = "default_max_message_bytes")]
395    #[validate(custom(function = "validate_message_size"))]
396    pub max_message_bytes: i64,
397
398    /// Enable auto topic creation
399    #[serde(default = "default_true")]
400    pub auto_create_topics: bool,
401
402    /// Enable compression
403    #[serde(default = "default_true")]
404    pub compression_enabled: bool,
405
406    /// Compression algorithm (lz4, zstd, none)
407    #[serde(default = "default_compression")]
408    #[validate(custom(function = "validate_compression_type"))]
409    pub compression_type: String,
410
411    /// Raft election timeout in milliseconds (100-60000)
412    #[serde(default = "default_election_timeout")]
413    #[validate(range(
414        min = 100,
415        max = 60000,
416        message = "election timeout must be between 100ms and 60s"
417    ))]
418    pub raft_election_timeout_ms: i32,
419
420    /// Raft heartbeat interval in milliseconds (10-10000)
421    #[serde(default = "default_heartbeat_interval")]
422    #[validate(range(
423        min = 10,
424        max = 10000,
425        message = "heartbeat interval must be between 10ms and 10s"
426    ))]
427    pub raft_heartbeat_interval_ms: i32,
428
429    /// Additional raw configuration overrides (max 50 entries)
430    #[serde(default)]
431    #[validate(custom(function = "validate_raw_config"))]
432    pub raw: BTreeMap<String, String>,
433}
434
435/// Validate compression type
436fn validate_compression_type(compression: &str) -> Result<(), ValidationError> {
437    match compression {
438        "lz4" | "zstd" | "none" | "snappy" | "gzip" => Ok(()),
439        _ => Err(ValidationError::new("invalid_compression")
440            .with_message("compression must be one of: lz4, zstd, none, snappy, gzip".into())),
441    }
442}
443
444/// Validate log segment size (1MB to 10GB)
445fn validate_segment_size(size: i64) -> Result<(), ValidationError> {
446    const MIN_SEGMENT_SIZE: i64 = 1_048_576; // 1MB
447    const MAX_SEGMENT_SIZE: i64 = 10_737_418_240; // 10GB
448    if !(MIN_SEGMENT_SIZE..=MAX_SEGMENT_SIZE).contains(&size) {
449        return Err(ValidationError::new("invalid_segment_size")
450            .with_message("segment size must be between 1MB and 10GB".into()));
451    }
452    Ok(())
453}
454
455/// Validate max message size (1KB to 100MB)
456fn validate_message_size(size: i64) -> Result<(), ValidationError> {
457    const MIN_MESSAGE_SIZE: i64 = 1_024; // 1KB
458    const MAX_MESSAGE_SIZE: i64 = 104_857_600; // 100MB
459    if !(MIN_MESSAGE_SIZE..=MAX_MESSAGE_SIZE).contains(&size) {
460        return Err(ValidationError::new("invalid_message_size")
461            .with_message("max message size must be between 1KB and 100MB".into()));
462    }
463    Ok(())
464}
465
466/// Validate raw config map
467fn validate_raw_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
468    if config.len() > 50 {
469        return Err(ValidationError::new("too_many_raw_configs")
470            .with_message("maximum 50 raw configuration entries allowed".into()));
471    }
472    for (key, value) in config {
473        if key.len() > 128 || value.len() > 4096 {
474            return Err(ValidationError::new("raw_config_too_long")
475                .with_message("raw config key max 128 chars, value max 4096 chars".into()));
476        }
477        // Prevent injection of dangerous config keys
478        let forbidden_keys = ["command", "args", "image", "securityContext", "volumes"];
479        if forbidden_keys.contains(&key.as_str()) {
480            return Err(ValidationError::new("forbidden_raw_config")
481                .with_message(format!("raw config key '{}' is not allowed", key).into()));
482        }
483    }
484    Ok(())
485}
486
487impl Default for BrokerConfig {
488    fn default() -> Self {
489        Self {
490            default_partitions: default_partitions(),
491            default_replication_factor: default_replication_factor(),
492            log_retention_hours: default_log_retention_hours(),
493            log_segment_bytes: default_log_segment_bytes(),
494            max_message_bytes: default_max_message_bytes(),
495            auto_create_topics: default_true(),
496            compression_enabled: default_true(),
497            compression_type: default_compression(),
498            raft_election_timeout_ms: default_election_timeout(),
499            raft_heartbeat_interval_ms: default_heartbeat_interval(),
500            raw: BTreeMap::new(),
501        }
502    }
503}
504
505/// TLS configuration
506#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
507#[serde(rename_all = "camelCase")]
508pub struct TlsSpec {
509    /// Enable TLS
510    #[serde(default)]
511    pub enabled: bool,
512
513    /// Name of the secret containing TLS certificates (required if enabled)
514    #[serde(default)]
515    #[validate(custom(function = "validate_optional_k8s_name"))]
516    pub cert_secret_name: Option<String>,
517
518    /// Enable mTLS (mutual TLS)
519    #[serde(default)]
520    pub mtls_enabled: bool,
521
522    /// Name of the secret containing CA certificate for mTLS
523    #[serde(default)]
524    #[validate(custom(function = "validate_optional_k8s_name"))]
525    pub ca_secret_name: Option<String>,
526}
527
528/// Metrics configuration
529#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
530#[serde(rename_all = "camelCase")]
531pub struct MetricsSpec {
532    /// Enable Prometheus metrics
533    #[serde(default = "default_true")]
534    pub enabled: bool,
535
536    /// Metrics port (1024-65535, must be unprivileged)
537    #[serde(default = "default_metrics_port")]
538    #[validate(range(
539        min = 1024,
540        max = 65535,
541        message = "metrics port must be between 1024 and 65535"
542    ))]
543    pub port: i32,
544
545    /// ServiceMonitor configuration for Prometheus Operator
546    #[serde(default)]
547    #[validate(nested)]
548    pub service_monitor: ServiceMonitorSpec,
549}
550
551impl Default for MetricsSpec {
552    fn default() -> Self {
553        Self {
554            enabled: true,
555            port: 9090,
556            service_monitor: ServiceMonitorSpec::default(),
557        }
558    }
559}
560
561/// ServiceMonitor configuration for Prometheus Operator integration
562#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
563#[serde(rename_all = "camelCase")]
564pub struct ServiceMonitorSpec {
565    /// Create a ServiceMonitor resource
566    #[serde(default)]
567    pub enabled: bool,
568
569    /// Namespace for the ServiceMonitor (defaults to cluster namespace)
570    #[serde(default)]
571    #[validate(custom(function = "validate_optional_k8s_name"))]
572    pub namespace: Option<String>,
573
574    /// Scrape interval (must match Prometheus duration format)
575    #[serde(default = "default_scrape_interval")]
576    #[validate(custom(function = "validate_duration"))]
577    pub interval: String,
578
579    /// Additional labels for the ServiceMonitor (max 10)
580    #[serde(default)]
581    #[validate(custom(function = "validate_service_monitor_labels"))]
582    pub labels: BTreeMap<String, String>,
583}
584
585/// Validate Prometheus duration format (e.g., "30s", "1m", "5m30s")
586fn validate_duration(duration: &str) -> Result<(), ValidationError> {
587    static DURATION_REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"^([0-9]+[smh])+$").unwrap());
588    if !DURATION_REGEX.is_match(duration) {
589        return Err(ValidationError::new("invalid_duration").with_message(
590            format!("'{}' is not a valid duration (e.g., 30s, 1m)", duration).into(),
591        ));
592    }
593    Ok(())
594}
595
596/// Validate ServiceMonitor labels
597fn validate_service_monitor_labels(
598    labels: &BTreeMap<String, String>,
599) -> Result<(), ValidationError> {
600    if labels.len() > 10 {
601        return Err(ValidationError::new("too_many_labels")
602            .with_message("maximum 10 ServiceMonitor labels allowed".into()));
603    }
604    Ok(())
605}
606
607/// Pod Disruption Budget configuration
608#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
609#[serde(rename_all = "camelCase")]
610pub struct PdbSpec {
611    /// Enable PDB creation
612    #[serde(default = "default_true")]
613    pub enabled: bool,
614
615    /// Minimum available pods (mutually exclusive with maxUnavailable)
616    /// Can be an integer or percentage (e.g., "50%")
617    #[serde(default)]
618    #[validate(custom(function = "validate_optional_int_or_percent"))]
619    pub min_available: Option<String>,
620
621    /// Maximum unavailable pods
622    /// Can be an integer or percentage (e.g., "25%")
623    #[serde(default = "default_max_unavailable")]
624    #[validate(custom(function = "validate_optional_int_or_percent"))]
625    pub max_unavailable: Option<String>,
626}
627
628/// Validate integer or percentage string (for Option<String> fields)
629fn validate_optional_int_or_percent(value: &str) -> Result<(), ValidationError> {
630    if value.is_empty() {
631        return Ok(());
632    }
633    // Allow integers or percentages
634    static INT_OR_PERCENT_REGEX: Lazy<Regex> =
635        Lazy::new(|| Regex::new(r"^([0-9]+|[0-9]+%)$").unwrap());
636    if !INT_OR_PERCENT_REGEX.is_match(value) {
637        return Err(ValidationError::new("invalid_int_or_percent").with_message(
638            format!(
639                "'{}' must be an integer or percentage (e.g., '1' or '25%')",
640                value
641            )
642            .into(),
643        ));
644    }
645    Ok(())
646}
647
648impl Default for PdbSpec {
649    fn default() -> Self {
650        Self {
651            enabled: true,
652            min_available: None,
653            max_unavailable: Some("1".to_string()),
654        }
655    }
656}
657
658/// Probe configuration for liveness/readiness
659#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
660#[serde(rename_all = "camelCase")]
661pub struct ProbeSpec {
662    /// Enable the probe
663    #[serde(default = "default_true")]
664    pub enabled: bool,
665
666    /// Initial delay before first probe (0-3600 seconds)
667    #[serde(default = "default_initial_delay")]
668    #[validate(range(min = 0, max = 3600, message = "initial delay must be 0-3600 seconds"))]
669    pub initial_delay_seconds: i32,
670
671    /// Period between probes (1-300 seconds)
672    #[serde(default = "default_period")]
673    #[validate(range(min = 1, max = 300, message = "period must be 1-300 seconds"))]
674    pub period_seconds: i32,
675
676    /// Timeout for probe (1-60 seconds)
677    #[serde(default = "default_timeout")]
678    #[validate(range(min = 1, max = 60, message = "timeout must be 1-60 seconds"))]
679    pub timeout_seconds: i32,
680
681    /// Success threshold (1-10)
682    #[serde(default = "default_one")]
683    #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
684    pub success_threshold: i32,
685
686    /// Failure threshold (1-30)
687    #[serde(default = "default_three")]
688    #[validate(range(min = 1, max = 30, message = "failure threshold must be 1-30"))]
689    pub failure_threshold: i32,
690}
691
692impl Default for ProbeSpec {
693    fn default() -> Self {
694        Self {
695            enabled: true,
696            initial_delay_seconds: 30,
697            period_seconds: 10,
698            timeout_seconds: 5,
699            success_threshold: 1,
700            failure_threshold: 3,
701        }
702    }
703}
704
705/// Status of a RivvenCluster resource
706#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
707#[serde(rename_all = "camelCase")]
708pub struct RivvenClusterStatus {
709    /// Current phase of the cluster
710    pub phase: ClusterPhase,
711
712    /// Total number of replicas
713    pub replicas: i32,
714
715    /// Number of ready replicas
716    pub ready_replicas: i32,
717
718    /// Number of updated replicas
719    pub updated_replicas: i32,
720
721    /// Current observed generation
722    pub observed_generation: i64,
723
724    /// Conditions describing cluster state
725    #[serde(default)]
726    pub conditions: Vec<ClusterCondition>,
727
728    /// Broker endpoints
729    #[serde(default)]
730    pub broker_endpoints: Vec<String>,
731
732    /// Current leader broker (if known)
733    pub leader: Option<String>,
734
735    /// Last time the status was updated
736    pub last_updated: Option<String>,
737
738    /// Error message if any
739    pub message: Option<String>,
740}
741
742/// Phase of the cluster lifecycle
743#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
744pub enum ClusterPhase {
745    /// Cluster is being created
746    #[default]
747    Pending,
748    /// Cluster is being provisioned
749    Provisioning,
750    /// Cluster is running and healthy
751    Running,
752    /// Cluster is updating/rolling
753    Updating,
754    /// Cluster is in degraded state
755    Degraded,
756    /// Cluster has failed
757    Failed,
758    /// Cluster is being deleted
759    Terminating,
760}
761
762/// Condition describing an aspect of cluster state
763#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
764#[serde(rename_all = "camelCase")]
765pub struct ClusterCondition {
766    /// Type of condition
767    #[serde(rename = "type")]
768    pub condition_type: String,
769
770    /// Status of the condition (True, False, Unknown)
771    pub status: String,
772
773    /// Reason for the condition
774    pub reason: Option<String>,
775
776    /// Human-readable message
777    pub message: Option<String>,
778
779    /// Last transition time
780    pub last_transition_time: Option<String>,
781}
782
783// Default value functions
784fn default_replicas() -> i32 {
785    3
786}
787
788fn default_version() -> String {
789    "0.0.1".to_string()
790}
791
792fn default_image_pull_policy() -> String {
793    "IfNotPresent".to_string()
794}
795
796fn default_storage_size() -> String {
797    "10Gi".to_string()
798}
799
800fn default_access_modes() -> Vec<String> {
801    vec!["ReadWriteOnce".to_string()]
802}
803
804fn default_partitions() -> i32 {
805    3
806}
807
808fn default_replication_factor() -> i32 {
809    2
810}
811
812fn default_log_retention_hours() -> i32 {
813    168 // 7 days
814}
815
816fn default_log_segment_bytes() -> i64 {
817    1073741824 // 1 GiB
818}
819
820fn default_max_message_bytes() -> i64 {
821    1048576 // 1 MiB
822}
823
824fn default_compression() -> String {
825    "lz4".to_string()
826}
827
828fn default_election_timeout() -> i32 {
829    1000
830}
831
832fn default_heartbeat_interval() -> i32 {
833    100
834}
835
836fn default_metrics_port() -> i32 {
837    9090
838}
839
840fn default_scrape_interval() -> String {
841    "30s".to_string()
842}
843
844fn default_max_unavailable() -> Option<String> {
845    Some("1".to_string())
846}
847
848fn default_initial_delay() -> i32 {
849    30
850}
851
852fn default_period() -> i32 {
853    10
854}
855
856fn default_timeout() -> i32 {
857    5
858}
859
860fn default_one() -> i32 {
861    1
862}
863
864fn default_three() -> i32 {
865    3
866}
867
868fn default_true() -> bool {
869    true
870}
871
872impl RivvenClusterSpec {
873    /// Get the full container image including version
874    pub fn get_image(&self) -> String {
875        if let Some(ref image) = self.image {
876            image.clone()
877        } else {
878            format!("ghcr.io/hupe1980/rivven:{}", self.version)
879        }
880    }
881
882    /// Get labels for managed resources
883    pub fn get_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
884        let mut labels = BTreeMap::new();
885        labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
886        labels.insert(
887            "app.kubernetes.io/instance".to_string(),
888            cluster_name.to_string(),
889        );
890        labels.insert(
891            "app.kubernetes.io/component".to_string(),
892            "broker".to_string(),
893        );
894        labels.insert(
895            "app.kubernetes.io/managed-by".to_string(),
896            "rivven-operator".to_string(),
897        );
898        labels.insert(
899            "app.kubernetes.io/version".to_string(),
900            self.version.clone(),
901        );
902        labels
903    }
904
905    /// Get selector labels for managed resources
906    pub fn get_selector_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
907        let mut labels = BTreeMap::new();
908        labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
909        labels.insert(
910            "app.kubernetes.io/instance".to_string(),
911            cluster_name.to_string(),
912        );
913        labels
914    }
915}
916
917// ============================================================================
918// RivvenConnect CRD - Connector Framework for Rivven
919// ============================================================================
920// Note: The RivvenConnect CRD is defined here but the controller is not yet
921// implemented. The #[allow(dead_code)] attributes are temporary until the
922// connect_controller module is added.
923
924/// RivvenConnect custom resource for managing connectors
925///
926/// This CRD allows declarative management of Rivven Connect pipelines,
927/// including source connectors (CDC, HTTP, etc.) and sink connectors
928/// (S3, stdout, HTTP webhooks, etc.).
929#[allow(dead_code)]
930#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
931#[kube(
932    group = "rivven.io",
933    version = "v1alpha1",
934    kind = "RivvenConnect",
935    plural = "rivvenconnects",
936    shortname = "rc",
937    namespaced,
938    status = "RivvenConnectStatus",
939    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
940    printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
941    printcolumn = r#"{"name":"Sources","type":"integer","jsonPath":".status.sourcesRunning"}"#,
942    printcolumn = r#"{"name":"Sinks","type":"integer","jsonPath":".status.sinksRunning"}"#,
943    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
944    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
945)]
946#[serde(rename_all = "camelCase")]
947pub struct RivvenConnectSpec {
948    /// Reference to the RivvenCluster this connect instance connects to
949    #[validate(nested)]
950    pub cluster_ref: ClusterReference,
951
952    /// Number of connect worker replicas (1-10)
953    #[serde(default = "default_connect_replicas")]
954    #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
955    pub replicas: i32,
956
957    /// Connect image version
958    #[serde(default = "default_version")]
959    pub version: String,
960
961    /// Custom container image (overrides version-based default)
962    #[serde(default)]
963    #[validate(custom(function = "validate_optional_image"))]
964    pub image: Option<String>,
965
966    /// Image pull policy
967    #[serde(default = "default_image_pull_policy")]
968    #[validate(custom(function = "validate_pull_policy"))]
969    pub image_pull_policy: String,
970
971    /// Image pull secrets
972    #[serde(default)]
973    pub image_pull_secrets: Vec<String>,
974
975    /// Resource requests/limits (following k8s ResourceRequirements schema)
976    #[serde(default)]
977    pub resources: Option<serde_json::Value>,
978
979    /// Global connect configuration
980    #[serde(default)]
981    #[validate(nested)]
982    pub config: ConnectConfigSpec,
983
984    /// Source connectors (read from external systems, publish to Rivven)
985    #[serde(default)]
986    #[validate(length(max = 50, message = "maximum 50 source connectors allowed"))]
987    pub sources: Vec<SourceConnectorSpec>,
988
989    /// Sink connectors (consume from Rivven, write to external systems)
990    #[serde(default)]
991    #[validate(length(max = 50, message = "maximum 50 sink connectors allowed"))]
992    pub sinks: Vec<SinkConnectorSpec>,
993
994    /// Global settings for all connectors
995    #[serde(default)]
996    #[validate(nested)]
997    pub settings: GlobalConnectSettings,
998
999    /// TLS configuration for broker connection
1000    #[serde(default)]
1001    #[validate(nested)]
1002    pub tls: ConnectTlsSpec,
1003
1004    /// Pod annotations
1005    #[serde(default)]
1006    #[validate(custom(function = "validate_annotations"))]
1007    pub pod_annotations: BTreeMap<String, String>,
1008
1009    /// Pod labels (cannot override app.kubernetes.io/* labels)
1010    #[serde(default)]
1011    #[validate(custom(function = "validate_labels"))]
1012    pub pod_labels: BTreeMap<String, String>,
1013
1014    /// Environment variables for the container
1015    #[serde(default)]
1016    #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
1017    pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
1018
1019    /// Node selector for pod scheduling
1020    #[serde(default)]
1021    pub node_selector: BTreeMap<String, String>,
1022
1023    /// Pod tolerations
1024    #[serde(default)]
1025    #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
1026    pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
1027
1028    /// Pod affinity rules
1029    #[serde(default)]
1030    pub affinity: Option<serde_json::Value>,
1031
1032    /// Service account name
1033    #[serde(default)]
1034    #[validate(custom(function = "validate_optional_k8s_name"))]
1035    pub service_account: Option<String>,
1036
1037    /// Pod security context
1038    #[serde(default)]
1039    pub security_context: Option<serde_json::Value>,
1040
1041    /// Container security context
1042    #[serde(default)]
1043    pub container_security_context: Option<serde_json::Value>,
1044}
1045
1046/// Reference to a RivvenCluster
1047#[allow(dead_code)]
1048#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1049#[serde(rename_all = "camelCase")]
1050pub struct ClusterReference {
1051    /// Name of the RivvenCluster
1052    #[validate(length(min = 1, max = 63, message = "cluster name must be 1-63 characters"))]
1053    #[validate(custom(function = "validate_k8s_name"))]
1054    pub name: String,
1055
1056    /// Namespace of the RivvenCluster (defaults to same namespace)
1057    #[serde(default)]
1058    #[validate(custom(function = "validate_optional_k8s_name"))]
1059    pub namespace: Option<String>,
1060}
1061
1062// ============================================================================
1063// RivvenTopic CRD - Declarative Topic Management
1064// ============================================================================
1065
1066/// RivvenTopic custom resource for declarative topic management
1067///
1068/// This CRD allows users to define topics as Kubernetes resources,
1069/// enabling GitOps workflows for topic lifecycle management.
1070///
1071/// # Example
1072///
1073/// ```yaml
1074/// apiVersion: rivven.io/v1alpha1
1075/// kind: RivvenTopic
1076/// metadata:
1077///   name: orders-events
1078///   namespace: production
1079/// spec:
1080///   clusterRef:
1081///     name: my-rivven-cluster
1082///   partitions: 12
1083///   replicationFactor: 3
1084///   config:
1085///     retentionMs: 604800000   # 7 days
1086///     cleanupPolicy: delete
1087///     compressionType: lz4
1088///   acls:
1089///     - principal: "user:order-service"
1090///       operations: ["Read", "Write"]
1091///     - principal: "user:analytics"
1092///       operations: ["Read"]
1093/// ```
1094// Note: These types are defined for CRD generation and future controller use.
1095// They are intentionally not yet constructed - the controller will use them.
1096#[allow(dead_code)]
1097#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1098#[kube(
1099    group = "rivven.io",
1100    version = "v1alpha1",
1101    kind = "RivvenTopic",
1102    plural = "rivventopics",
1103    shortname = "rt",
1104    namespaced,
1105    status = "RivvenTopicStatus",
1106    printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1107    printcolumn = r#"{"name":"Partitions","type":"integer","jsonPath":".spec.partitions"}"#,
1108    printcolumn = r#"{"name":"Replication","type":"integer","jsonPath":".spec.replicationFactor"}"#,
1109    printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1110    printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1111)]
1112#[serde(rename_all = "camelCase")]
1113pub struct RivvenTopicSpec {
1114    /// Reference to the RivvenCluster this topic belongs to
1115    #[validate(nested)]
1116    pub cluster_ref: ClusterReference,
1117
1118    /// Number of partitions for the topic (1-10000)
1119    /// Cannot be decreased after creation
1120    #[serde(default = "default_rivven_topic_partitions")]
1121    #[validate(range(
1122        min = 1,
1123        max = 10000,
1124        message = "partitions must be between 1 and 10000"
1125    ))]
1126    pub partitions: i32,
1127
1128    /// Replication factor for the topic (1-10)
1129    /// Must not exceed the number of brokers in the cluster
1130    #[serde(default = "default_rivven_topic_replication")]
1131    #[validate(range(
1132        min = 1,
1133        max = 10,
1134        message = "replication factor must be between 1 and 10"
1135    ))]
1136    pub replication_factor: i32,
1137
1138    /// Topic configuration parameters
1139    #[serde(default)]
1140    #[validate(nested)]
1141    pub config: TopicConfig,
1142
1143    /// Access control list entries for the topic
1144    #[serde(default)]
1145    #[validate(length(max = 100, message = "maximum 100 ACL entries allowed"))]
1146    #[validate(custom(function = "validate_topic_acls"))]
1147    pub acls: Vec<TopicAcl>,
1148
1149    /// Whether to delete the topic from Rivven when the CRD is deleted
1150    /// Default: true (topic is deleted when CRD is removed)
1151    #[serde(default = "default_true")]
1152    pub delete_on_remove: bool,
1153
1154    /// Labels to apply to the topic metadata
1155    #[serde(default)]
1156    #[validate(custom(function = "validate_labels"))]
1157    pub topic_labels: BTreeMap<String, String>,
1158}
1159
1160#[allow(dead_code)]
1161fn default_rivven_topic_partitions() -> i32 {
1162    3
1163}
1164
1165#[allow(dead_code)]
1166fn default_rivven_topic_replication() -> i32 {
1167    1
1168}
1169
1170/// Topic configuration parameters
1171#[allow(dead_code)]
1172#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1173#[serde(rename_all = "camelCase")]
1174pub struct TopicConfig {
1175    /// Retention time in milliseconds (1 hour to 10 years)
1176    /// Default: 604800000 (7 days)
1177    #[serde(default = "default_topic_retention_ms")]
1178    #[validate(range(
1179        min = 3600000,
1180        max = 315360000000_i64,
1181        message = "retention must be between 1 hour and 10 years"
1182    ))]
1183    pub retention_ms: i64,
1184
1185    /// Retention size in bytes per partition (-1 for unlimited)
1186    /// Default: -1 (unlimited)
1187    #[serde(default = "default_topic_retention_bytes")]
1188    #[validate(custom(function = "validate_topic_retention_bytes"))]
1189    pub retention_bytes: i64,
1190
1191    /// Segment size in bytes (1MB to 10GB)
1192    #[serde(default = "default_topic_segment_bytes")]
1193    #[validate(custom(function = "validate_segment_size"))]
1194    pub segment_bytes: i64,
1195
1196    /// Cleanup policy: "delete", "compact", or "delete,compact"
1197    #[serde(default = "default_topic_cleanup_policy")]
1198    #[validate(custom(function = "validate_topic_cleanup_policy"))]
1199    pub cleanup_policy: String,
1200
1201    /// Compression type: "none", "gzip", "snappy", "lz4", "zstd"
1202    #[serde(default = "default_topic_compression")]
1203    #[validate(custom(function = "validate_topic_compression"))]
1204    pub compression_type: String,
1205
1206    /// Minimum number of in-sync replicas for writes (1-10)
1207    #[serde(default = "default_topic_min_isr")]
1208    #[validate(range(min = 1, max = 10, message = "min ISR must be between 1 and 10"))]
1209    pub min_insync_replicas: i32,
1210
1211    /// Maximum message size in bytes (1KB to 100MB)
1212    #[serde(default = "default_max_message_bytes")]
1213    #[validate(custom(function = "validate_message_size"))]
1214    pub max_message_bytes: i64,
1215
1216    /// Whether to enable message timestamps
1217    #[serde(default = "default_true")]
1218    pub message_timestamp_enabled: bool,
1219
1220    /// Timestamp type: "CreateTime" or "LogAppendTime"
1221    #[serde(default = "default_topic_timestamp_type")]
1222    #[validate(custom(function = "validate_topic_timestamp_type"))]
1223    pub message_timestamp_type: String,
1224
1225    /// Whether to enable idempotent writes
1226    #[serde(default = "default_true")]
1227    pub idempotent_writes: bool,
1228
1229    /// Flush interval in milliseconds (0 for no scheduled flush)
1230    #[serde(default)]
1231    #[validate(range(min = 0, max = 86400000, message = "flush interval must be 0-24 hours"))]
1232    pub flush_interval_ms: i64,
1233
1234    /// Custom configuration key-value pairs
1235    #[serde(default)]
1236    #[validate(custom(function = "validate_topic_custom_config"))]
1237    pub custom: BTreeMap<String, String>,
1238}
1239
1240#[allow(dead_code)]
1241fn default_topic_retention_ms() -> i64 {
1242    604800000 // 7 days
1243}
1244
1245#[allow(dead_code)]
1246fn default_topic_retention_bytes() -> i64 {
1247    -1 // unlimited
1248}
1249
1250#[allow(dead_code)]
1251fn default_topic_segment_bytes() -> i64 {
1252    1073741824 // 1GB
1253}
1254
1255#[allow(dead_code)]
1256fn default_topic_cleanup_policy() -> String {
1257    "delete".to_string()
1258}
1259
1260#[allow(dead_code)]
1261fn default_topic_compression() -> String {
1262    "lz4".to_string()
1263}
1264
1265#[allow(dead_code)]
1266fn default_topic_min_isr() -> i32 {
1267    1
1268}
1269
1270#[allow(dead_code)]
1271fn default_topic_timestamp_type() -> String {
1272    "CreateTime".to_string()
1273}
1274
1275#[allow(dead_code)]
1276fn validate_topic_retention_bytes(value: i64) -> Result<(), ValidationError> {
1277    if value == -1 || (1048576..=10995116277760).contains(&value) {
1278        Ok(())
1279    } else {
1280        Err(ValidationError::new("invalid_retention_bytes")
1281            .with_message("retention_bytes must be -1 (unlimited) or 1MB-10TB".into()))
1282    }
1283}
1284
1285#[allow(dead_code)]
1286fn validate_topic_cleanup_policy(policy: &str) -> Result<(), ValidationError> {
1287    match policy {
1288        "delete" | "compact" | "delete,compact" | "compact,delete" => Ok(()),
1289        _ => Err(ValidationError::new("invalid_cleanup_policy").with_message(
1290            "cleanup_policy must be 'delete', 'compact', or 'delete,compact'".into(),
1291        )),
1292    }
1293}
1294
1295#[allow(dead_code)]
1296fn validate_topic_compression(compression: &str) -> Result<(), ValidationError> {
1297    match compression {
1298        "none" | "gzip" | "snappy" | "lz4" | "zstd" | "producer" => Ok(()),
1299        _ => Err(ValidationError::new("invalid_compression")
1300            .with_message("compression must be none, gzip, snappy, lz4, zstd, or producer".into())),
1301    }
1302}
1303
1304#[allow(dead_code)]
1305fn validate_topic_timestamp_type(ts_type: &str) -> Result<(), ValidationError> {
1306    match ts_type {
1307        "CreateTime" | "LogAppendTime" => Ok(()),
1308        _ => Err(ValidationError::new("invalid_timestamp_type")
1309            .with_message("timestamp type must be 'CreateTime' or 'LogAppendTime'".into())),
1310    }
1311}
1312
1313#[allow(dead_code)]
1314fn validate_topic_custom_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
1315    if config.len() > 50 {
1316        return Err(ValidationError::new("too_many_custom_configs")
1317            .with_message("maximum 50 custom config entries allowed".into()));
1318    }
1319    for (key, value) in config {
1320        if key.len() > 128 || value.len() > 4096 {
1321            return Err(ValidationError::new("config_too_long")
1322                .with_message("config key max 128 chars, value max 4096 chars".into()));
1323        }
1324        // Prevent overriding protected configs
1325        let protected = [
1326            "retention.ms",
1327            "retention.bytes",
1328            "segment.bytes",
1329            "cleanup.policy",
1330        ];
1331        if protected.contains(&key.as_str()) {
1332            return Err(ValidationError::new("protected_config").with_message(
1333                format!(
1334                    "'{}' must be set via dedicated field, not custom config",
1335                    key
1336                )
1337                .into(),
1338            ));
1339        }
1340    }
1341    Ok(())
1342}
1343
1344/// Topic ACL entry
1345#[allow(dead_code)]
1346#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1347#[serde(rename_all = "camelCase")]
1348pub struct TopicAcl {
1349    /// Principal (e.g., "user:myuser", "group:mygroup", "*")
1350    #[validate(length(min = 1, max = 256, message = "principal must be 1-256 characters"))]
1351    #[validate(custom(function = "validate_principal"))]
1352    pub principal: String,
1353
1354    /// Allowed operations: Read, Write, Create, Delete, Alter, Describe, All
1355    #[validate(length(min = 1, max = 7, message = "must specify 1-7 operations"))]
1356    #[validate(custom(function = "validate_operations"))]
1357    pub operations: Vec<String>,
1358
1359    /// Permission type: Allow or Deny (default: Allow)
1360    #[serde(default = "default_permission_type")]
1361    #[validate(custom(function = "validate_permission_type"))]
1362    pub permission_type: String,
1363
1364    /// Host restriction (default: "*" for any host)
1365    #[serde(default = "default_acl_host")]
1366    #[validate(length(max = 256, message = "host must be max 256 characters"))]
1367    pub host: String,
1368}
1369
1370#[allow(dead_code)]
1371fn default_permission_type() -> String {
1372    "Allow".to_string()
1373}
1374
1375#[allow(dead_code)]
1376fn default_acl_host() -> String {
1377    "*".to_string()
1378}
1379
1380#[allow(dead_code)]
1381fn validate_principal(principal: &str) -> Result<(), ValidationError> {
1382    if principal == "*" {
1383        return Ok(());
1384    }
1385    if let Some((prefix, name)) = principal.split_once(':') {
1386        if !["user", "group", "User", "Group"].contains(&prefix) {
1387            return Err(ValidationError::new("invalid_principal_prefix")
1388                .with_message("principal prefix must be 'user:' or 'group:'".into()));
1389        }
1390        if name.is_empty() || name.len() > 128 {
1391            return Err(ValidationError::new("invalid_principal_name")
1392                .with_message("principal name must be 1-128 characters".into()));
1393        }
1394        Ok(())
1395    } else {
1396        Err(ValidationError::new("invalid_principal_format")
1397            .with_message("principal must be '*' or 'user:name' or 'group:name'".into()))
1398    }
1399}
1400
1401fn validate_operations(ops: &[String]) -> Result<(), ValidationError> {
1402    let valid_ops = [
1403        "Read",
1404        "Write",
1405        "Create",
1406        "Delete",
1407        "Alter",
1408        "Describe",
1409        "All",
1410        "DescribeConfigs",
1411        "AlterConfigs",
1412    ];
1413    for op in ops {
1414        if !valid_ops.contains(&op.as_str()) {
1415            return Err(ValidationError::new("invalid_operation").with_message(
1416                format!("'{}' is not a valid operation. Valid: {:?}", op, valid_ops).into(),
1417            ));
1418        }
1419    }
1420    Ok(())
1421}
1422
1423#[allow(dead_code)]
1424fn validate_permission_type(perm: &str) -> Result<(), ValidationError> {
1425    match perm {
1426        "Allow" | "Deny" => Ok(()),
1427        _ => Err(ValidationError::new("invalid_permission_type")
1428            .with_message("permission_type must be 'Allow' or 'Deny'".into())),
1429    }
1430}
1431
1432#[allow(dead_code)]
1433fn validate_topic_acls(acls: &[TopicAcl]) -> Result<(), ValidationError> {
1434    // Check for duplicate principal+operation combinations
1435    let mut seen = std::collections::HashSet::new();
1436    for acl in acls {
1437        for op in &acl.operations {
1438            let key = format!("{}:{}", acl.principal, op);
1439            if !seen.insert(key.clone()) {
1440                return Err(ValidationError::new("duplicate_acl")
1441                    .with_message(format!("duplicate ACL entry for {}", key).into()));
1442            }
1443        }
1444    }
1445    Ok(())
1446}
1447
1448/// Status of the RivvenTopic resource
1449#[allow(dead_code)]
1450#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1451#[serde(rename_all = "camelCase")]
1452pub struct RivvenTopicStatus {
1453    /// Current phase: Pending, Creating, Ready, Updating, Deleting, Failed
1454    #[serde(default)]
1455    pub phase: String,
1456
1457    /// Human-readable message about current state
1458    #[serde(default)]
1459    pub message: String,
1460
1461    /// Actual number of partitions (may differ during scaling)
1462    #[serde(default)]
1463    pub current_partitions: i32,
1464
1465    /// Actual replication factor
1466    #[serde(default)]
1467    pub current_replication_factor: i32,
1468
1469    /// Whether the topic exists in Rivven
1470    #[serde(default)]
1471    pub topic_exists: bool,
1472
1473    /// Observed generation for tracking spec changes
1474    #[serde(default)]
1475    pub observed_generation: i64,
1476
1477    /// Conditions for detailed status tracking
1478    #[serde(default)]
1479    pub conditions: Vec<TopicCondition>,
1480
1481    /// Last time the topic was successfully synced
1482    #[serde(default)]
1483    pub last_sync_time: Option<String>,
1484
1485    /// Partition leader information
1486    #[serde(default)]
1487    pub partition_info: Vec<PartitionInfo>,
1488}
1489
1490/// Condition for tracking topic status
1491#[allow(dead_code)]
1492#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1493#[serde(rename_all = "camelCase")]
1494pub struct TopicCondition {
1495    /// Type of condition: Ready, Synced, ACLsApplied, ConfigApplied
1496    pub r#type: String,
1497
1498    /// Status: True, False, Unknown
1499    pub status: String,
1500
1501    /// Machine-readable reason
1502    pub reason: String,
1503
1504    /// Human-readable message
1505    pub message: String,
1506
1507    /// Last transition time
1508    pub last_transition_time: String,
1509}
1510
1511/// Information about a topic partition
1512#[allow(dead_code)]
1513#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1514#[serde(rename_all = "camelCase")]
1515pub struct PartitionInfo {
1516    /// Partition ID
1517    pub partition: i32,
1518
1519    /// Leader broker ID
1520    pub leader: i32,
1521
1522    /// Replica broker IDs
1523    pub replicas: Vec<i32>,
1524
1525    /// In-sync replica broker IDs
1526    pub isr: Vec<i32>,
1527}
1528
1529/// Global connect configuration
1530#[allow(dead_code)]
1531#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1532#[serde(rename_all = "camelCase")]
1533pub struct ConnectConfigSpec {
1534    /// State directory path inside container
1535    #[serde(default = "default_state_dir")]
1536    pub state_dir: String,
1537
1538    /// Log level (trace, debug, info, warn, error)
1539    #[serde(default = "default_log_level")]
1540    #[validate(custom(function = "validate_log_level"))]
1541    pub log_level: String,
1542}
1543
1544#[allow(dead_code)]
1545fn default_state_dir() -> String {
1546    "/data/connect-state".to_string()
1547}
1548
1549#[allow(dead_code)]
1550fn default_log_level() -> String {
1551    "info".to_string()
1552}
1553
1554#[allow(dead_code)]
1555fn validate_log_level(level: &str) -> Result<(), ValidationError> {
1556    match level {
1557        "trace" | "debug" | "info" | "warn" | "error" => Ok(()),
1558        _ => Err(ValidationError::new("invalid_log_level")
1559            .with_message("log level must be one of: trace, debug, info, warn, error".into())),
1560    }
1561}
1562
1563/// Source connector specification
1564#[allow(dead_code)]
1565#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1566#[serde(rename_all = "camelCase")]
1567pub struct SourceConnectorSpec {
1568    /// Unique name for this source connector
1569    #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1570    #[validate(custom(function = "validate_k8s_name"))]
1571    pub name: String,
1572
1573    /// Connector type (postgres-cdc, mysql-cdc, http, datagen, etc.)
1574    #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1575    #[validate(custom(function = "validate_connector_type"))]
1576    pub connector: String,
1577
1578    /// Target topic to publish events to
1579    #[validate(length(min = 1, max = 255, message = "topic must be 1-255 characters"))]
1580    pub topic: String,
1581
1582    /// Topic routing pattern (supports {schema}, {table}, {database} placeholders)
1583    #[serde(default)]
1584    pub topic_routing: Option<String>,
1585
1586    /// Whether this source is enabled
1587    #[serde(default = "default_true")]
1588    pub enabled: bool,
1589
1590    // ========================================================================
1591    // Typed Connector Configurations (mutually exclusive, validated at runtime)
1592    // Use these for built-in connectors to get validation and discoverability.
1593    // ========================================================================
1594    /// PostgreSQL CDC specific configuration
1595    #[serde(default)]
1596    #[validate(nested)]
1597    pub postgres_cdc: Option<PostgresCdcConfig>,
1598
1599    /// MySQL CDC specific configuration
1600    #[serde(default)]
1601    #[validate(nested)]
1602    pub mysql_cdc: Option<MysqlCdcConfig>,
1603
1604    /// HTTP source specific configuration
1605    #[serde(default)]
1606    #[validate(nested)]
1607    pub http: Option<HttpSourceConfig>,
1608
1609    /// Datagen source specific configuration
1610    #[serde(default)]
1611    #[validate(nested)]
1612    pub datagen: Option<DatagenConfig>,
1613
1614    /// Kafka source specific configuration (rivven-queue)
1615    #[serde(default)]
1616    #[validate(nested)]
1617    pub kafka: Option<KafkaSourceConfig>,
1618
1619    /// MQTT source specific configuration (rivven-queue)
1620    #[serde(default)]
1621    #[validate(nested)]
1622    pub mqtt: Option<MqttSourceConfig>,
1623
1624    /// SQS source specific configuration (rivven-queue)
1625    #[serde(default)]
1626    #[validate(nested)]
1627    pub sqs: Option<SqsSourceConfig>,
1628
1629    /// Pub/Sub source specific configuration (rivven-queue)
1630    #[serde(default)]
1631    #[validate(nested)]
1632    pub pubsub: Option<PubSubSourceConfig>,
1633
1634    // ========================================================================
1635    // Generic Configuration (for custom connectors or advanced overrides)
1636    // ========================================================================
1637    /// Generic connector configuration (for custom connectors)
1638    /// Use typed fields above for built-in connectors when possible.
1639    #[serde(default)]
1640    pub config: serde_json::Value,
1641
1642    /// Secret reference for sensitive configuration (passwords, keys)
1643    #[serde(default)]
1644    #[validate(custom(function = "validate_optional_k8s_name"))]
1645    pub config_secret_ref: Option<String>,
1646
1647    /// Topic configuration (partitions, replication)
1648    #[serde(default)]
1649    #[validate(nested)]
1650    pub topic_config: SourceTopicConfigSpec,
1651}
1652
1653#[allow(dead_code)]
1654fn validate_connector_type(connector: &str) -> Result<(), ValidationError> {
1655    // Allow standard connectors and custom ones (must be alphanumeric with hyphens)
1656    static CONNECTOR_REGEX: Lazy<Regex> =
1657        Lazy::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
1658    if !CONNECTOR_REGEX.is_match(connector) {
1659        return Err(ValidationError::new("invalid_connector_type")
1660            .with_message("connector type must be lowercase alphanumeric with hyphens".into()));
1661    }
1662    Ok(())
1663}
1664
1665/// Table specification for CDC sources
1666#[allow(dead_code)]
1667#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1668#[serde(rename_all = "camelCase")]
1669pub struct TableSpec {
1670    /// Schema/namespace (e.g., "public" for PostgreSQL)
1671    #[serde(default)]
1672    pub schema: Option<String>,
1673
1674    /// Table name
1675    #[validate(length(min = 1, max = 128, message = "table name must be 1-128 characters"))]
1676    pub table: String,
1677
1678    /// Override topic for this specific table
1679    #[serde(default)]
1680    pub topic: Option<String>,
1681
1682    /// Columns to include (empty = all columns)
1683    #[serde(default)]
1684    #[validate(length(max = 500, message = "maximum 500 columns per table"))]
1685    pub columns: Vec<String>,
1686
1687    /// Columns to exclude
1688    #[serde(default)]
1689    #[validate(length(max = 500, message = "maximum 500 excluded columns per table"))]
1690    pub exclude_columns: Vec<String>,
1691
1692    /// Column masking rules (column -> mask pattern)
1693    #[serde(default)]
1694    pub column_masks: std::collections::BTreeMap<String, String>,
1695}
1696
1697// ============================================================================
1698// Typed CDC Configuration (Best-in-Class Hybrid Approach)
1699// ============================================================================
1700// These typed configs provide validation and discoverability for built-in
1701// connectors while preserving the generic `config` field for custom connectors.
1702
1703/// PostgreSQL CDC specific configuration
1704#[allow(dead_code)]
1705#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1706#[serde(rename_all = "camelCase")]
1707pub struct PostgresCdcConfig {
1708    /// Replication slot name (auto-created if not exists)
1709    #[serde(default)]
1710    #[validate(length(max = 63, message = "slot name max 63 characters"))]
1711    pub slot_name: Option<String>,
1712
1713    /// PostgreSQL publication name (auto-created if not exists)
1714    #[serde(default)]
1715    #[validate(length(max = 63, message = "publication name max 63 characters"))]
1716    pub publication: Option<String>,
1717
1718    /// Snapshot mode: initial, never, when_needed, exported, custom
1719    #[serde(default)]
1720    #[validate(custom(function = "validate_snapshot_mode"))]
1721    pub snapshot_mode: Option<String>,
1722
1723    /// Decoding plugin: pgoutput (default), wal2json, decoderbufs
1724    #[serde(default)]
1725    #[validate(custom(function = "validate_decoding_plugin"))]
1726    pub decoding_plugin: Option<String>,
1727
1728    /// Include transaction metadata in events
1729    #[serde(default)]
1730    pub include_transaction_metadata: Option<bool>,
1731
1732    /// Heartbeat interval in milliseconds (0 = disabled)
1733    #[serde(default)]
1734    #[validate(range(
1735        min = 0,
1736        max = 3600000,
1737        message = "heartbeat interval must be 0-3600000ms"
1738    ))]
1739    pub heartbeat_interval_ms: Option<i64>,
1740
1741    /// Signal table for runtime control (schema.table format)
1742    #[serde(default)]
1743    pub signal_table: Option<String>,
1744
1745    /// Tables to capture from PostgreSQL database
1746    #[serde(default)]
1747    #[validate(length(max = 100, message = "maximum 100 tables per source"))]
1748    pub tables: Vec<TableSpec>,
1749}
1750
1751#[allow(dead_code)]
1752fn validate_snapshot_mode(mode: &str) -> Result<(), ValidationError> {
1753    match mode {
1754        "" | "initial" | "never" | "when_needed" | "exported" | "custom" => Ok(()),
1755        _ => Err(ValidationError::new("invalid_snapshot_mode").with_message(
1756            "snapshot mode must be: initial, never, when_needed, exported, or custom".into(),
1757        )),
1758    }
1759}
1760
1761#[allow(dead_code)]
1762fn validate_decoding_plugin(plugin: &str) -> Result<(), ValidationError> {
1763    match plugin {
1764        "" | "pgoutput" | "wal2json" | "decoderbufs" => Ok(()),
1765        _ => Err(ValidationError::new("invalid_decoding_plugin")
1766            .with_message("decoding plugin must be: pgoutput, wal2json, or decoderbufs".into())),
1767    }
1768}
1769
1770/// MySQL CDC specific configuration
1771#[allow(dead_code)]
1772#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1773#[serde(rename_all = "camelCase")]
1774pub struct MysqlCdcConfig {
1775    /// Server ID for binlog replication (must be unique per cluster)
1776    #[serde(default)]
1777    #[validate(range(
1778        min = 1,
1779        max = 4294967295_i64,
1780        message = "server ID must be 1-4294967295"
1781    ))]
1782    pub server_id: Option<i64>,
1783
1784    /// Snapshot mode: initial, never, when_needed, schema_only
1785    #[serde(default)]
1786    #[validate(custom(function = "validate_mysql_snapshot_mode"))]
1787    pub snapshot_mode: Option<String>,
1788
1789    /// Include GTID position in events
1790    #[serde(default)]
1791    pub include_gtid: Option<bool>,
1792
1793    /// Heartbeat interval in milliseconds
1794    #[serde(default)]
1795    #[validate(range(
1796        min = 0,
1797        max = 3600000,
1798        message = "heartbeat interval must be 0-3600000ms"
1799    ))]
1800    pub heartbeat_interval_ms: Option<i64>,
1801
1802    /// Database history topic for schema changes
1803    #[serde(default)]
1804    pub database_history_topic: Option<String>,
1805
1806    /// Tables to capture from MySQL database
1807    #[serde(default)]
1808    #[validate(length(max = 100, message = "maximum 100 tables per source"))]
1809    pub tables: Vec<TableSpec>,
1810}
1811
1812#[allow(dead_code)]
1813fn validate_mysql_snapshot_mode(mode: &str) -> Result<(), ValidationError> {
1814    match mode {
1815        "" | "initial" | "never" | "when_needed" | "schema_only" => Ok(()),
1816        _ => Err(
1817            ValidationError::new("invalid_mysql_snapshot_mode").with_message(
1818                "snapshot mode must be: initial, never, when_needed, or schema_only".into(),
1819            ),
1820        ),
1821    }
1822}
1823
1824/// HTTP source specific configuration
1825#[allow(dead_code)]
1826#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1827#[serde(rename_all = "camelCase")]
1828pub struct HttpSourceConfig {
1829    /// Listen address (default: 0.0.0.0)
1830    #[serde(default)]
1831    pub listen_address: Option<String>,
1832
1833    /// Listen port
1834    #[serde(default)]
1835    #[validate(range(min = 1, max = 65535, message = "port must be 1-65535"))]
1836    pub port: Option<i32>,
1837
1838    /// Path prefix for webhook endpoint
1839    #[serde(default)]
1840    pub path_prefix: Option<String>,
1841
1842    /// Require authentication
1843    #[serde(default)]
1844    pub require_auth: Option<bool>,
1845
1846    /// Authentication secret reference
1847    #[serde(default)]
1848    pub auth_secret_ref: Option<String>,
1849}
1850
1851/// Datagen source specific configuration
1852#[allow(dead_code)]
1853#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1854#[serde(rename_all = "camelCase")]
1855pub struct DatagenConfig {
1856    /// Events per second to generate
1857    #[serde(default)]
1858    #[validate(range(
1859        min = 1,
1860        max = 1000000,
1861        message = "events per second must be 1-1000000"
1862    ))]
1863    pub events_per_second: Option<i64>,
1864
1865    /// Total events to generate (0 = unlimited)
1866    #[serde(default)]
1867    pub max_events: Option<i64>,
1868
1869    /// Event schema type: json, avro, simple
1870    #[serde(default)]
1871    pub schema_type: Option<String>,
1872
1873    /// Random seed for reproducible generation
1874    #[serde(default)]
1875    pub seed: Option<i64>,
1876}
1877
1878// ============================================================================
1879// Typed Queue Source Configurations (rivven-queue crate)
1880// ============================================================================
1881
1882/// Kafka source specific configuration
1883#[allow(dead_code)]
1884#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1885#[serde(rename_all = "camelCase")]
1886pub struct KafkaSourceConfig {
1887    /// Kafka broker addresses
1888    #[serde(default)]
1889    pub brokers: Option<Vec<String>>,
1890
1891    /// Kafka topic to consume from
1892    #[serde(default)]
1893    pub topic: Option<String>,
1894
1895    /// Consumer group ID
1896    #[serde(default)]
1897    pub consumer_group: Option<String>,
1898
1899    /// Start offset: earliest, latest
1900    #[serde(default)]
1901    #[validate(custom(function = "validate_kafka_start_offset"))]
1902    pub start_offset: Option<String>,
1903
1904    /// Security protocol: plaintext, ssl, sasl_plaintext, sasl_ssl
1905    #[serde(default)]
1906    #[validate(custom(function = "validate_kafka_security_protocol"))]
1907    pub security_protocol: Option<String>,
1908
1909    /// SASL mechanism: plain, scram-sha-256, scram-sha-512
1910    #[serde(default)]
1911    pub sasl_mechanism: Option<String>,
1912
1913    /// SASL username
1914    #[serde(default)]
1915    pub sasl_username: Option<String>,
1916}
1917
1918#[allow(dead_code)]
1919fn validate_kafka_start_offset(offset: &str) -> Result<(), ValidationError> {
1920    match offset {
1921        "" | "earliest" | "latest" => Ok(()),
1922        _ => Err(ValidationError::new("invalid_kafka_start_offset")
1923            .with_message("start offset must be: earliest or latest".into())),
1924    }
1925}
1926
1927#[allow(dead_code)]
1928fn validate_kafka_security_protocol(protocol: &str) -> Result<(), ValidationError> {
1929    match protocol {
1930        "" | "plaintext" | "ssl" | "sasl_plaintext" | "sasl_ssl" => Ok(()),
1931        _ => Err(ValidationError::new("invalid_kafka_security_protocol")
1932            .with_message("protocol must be: plaintext, ssl, sasl_plaintext, or sasl_ssl".into())),
1933    }
1934}
1935
1936/// MQTT source specific configuration
1937#[allow(dead_code)]
1938#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1939#[serde(rename_all = "camelCase")]
1940pub struct MqttSourceConfig {
1941    /// MQTT broker URL (e.g., mqtt://broker:1883)
1942    #[serde(default)]
1943    pub broker_url: Option<String>,
1944
1945    /// MQTT topics to subscribe (supports wildcards: +, #)
1946    #[serde(default)]
1947    pub topics: Option<Vec<String>>,
1948
1949    /// Client ID
1950    #[serde(default)]
1951    pub client_id: Option<String>,
1952
1953    /// QoS level: at_most_once, at_least_once, exactly_once
1954    #[serde(default)]
1955    #[validate(custom(function = "validate_mqtt_qos"))]
1956    pub qos: Option<String>,
1957
1958    /// Clean session on connect
1959    #[serde(default)]
1960    pub clean_session: Option<bool>,
1961
1962    /// MQTT protocol version: v3, v311, v5
1963    #[serde(default)]
1964    pub mqtt_version: Option<String>,
1965
1966    /// Username for authentication
1967    #[serde(default)]
1968    pub username: Option<String>,
1969}
1970
1971#[allow(dead_code)]
1972fn validate_mqtt_qos(qos: &str) -> Result<(), ValidationError> {
1973    match qos {
1974        "" | "at_most_once" | "at_least_once" | "exactly_once" => Ok(()),
1975        _ => Err(ValidationError::new("invalid_mqtt_qos")
1976            .with_message("QoS must be: at_most_once, at_least_once, or exactly_once".into())),
1977    }
1978}
1979
1980/// SQS source specific configuration
1981#[allow(dead_code)]
1982#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1983#[serde(rename_all = "camelCase")]
1984pub struct SqsSourceConfig {
1985    /// SQS queue URL
1986    #[serde(default)]
1987    pub queue_url: Option<String>,
1988
1989    /// AWS region
1990    #[serde(default)]
1991    pub region: Option<String>,
1992
1993    /// Max messages per poll (1-10)
1994    #[serde(default)]
1995    #[validate(range(min = 1, max = 10, message = "max messages must be 1-10"))]
1996    pub max_messages: Option<i32>,
1997
1998    /// Long polling wait time in seconds
1999    #[serde(default)]
2000    #[validate(range(min = 0, max = 20, message = "wait time must be 0-20 seconds"))]
2001    pub wait_time_seconds: Option<i32>,
2002
2003    /// Visibility timeout in seconds
2004    #[serde(default)]
2005    #[validate(range(
2006        min = 0,
2007        max = 43200,
2008        message = "visibility timeout must be 0-43200 seconds"
2009    ))]
2010    pub visibility_timeout: Option<i32>,
2011
2012    /// AWS profile name
2013    #[serde(default)]
2014    pub aws_profile: Option<String>,
2015
2016    /// IAM role ARN to assume
2017    #[serde(default)]
2018    pub role_arn: Option<String>,
2019}
2020
2021/// Google Cloud Pub/Sub source specific configuration
2022#[allow(dead_code)]
2023#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2024#[serde(rename_all = "camelCase")]
2025pub struct PubSubSourceConfig {
2026    /// GCP project ID
2027    #[serde(default)]
2028    pub project_id: Option<String>,
2029
2030    /// Subscription name
2031    #[serde(default)]
2032    pub subscription: Option<String>,
2033
2034    /// Topic name (for auto-created subscriptions)
2035    #[serde(default)]
2036    pub topic: Option<String>,
2037
2038    /// Max messages per pull request
2039    #[serde(default)]
2040    #[validate(range(min = 1, max = 1000, message = "max messages must be 1-1000"))]
2041    pub max_messages: Option<i32>,
2042
2043    /// Ack deadline in seconds
2044    #[serde(default)]
2045    #[validate(range(min = 10, max = 600, message = "ack deadline must be 10-600 seconds"))]
2046    pub ack_deadline_seconds: Option<i32>,
2047
2048    /// Path to service account credentials file
2049    #[serde(default)]
2050    pub credentials_file: Option<String>,
2051
2052    /// Use Application Default Credentials
2053    #[serde(default)]
2054    pub use_adc: Option<bool>,
2055}
2056
2057/// S3 sink specific configuration
2058#[allow(dead_code)]
2059#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2060#[serde(rename_all = "camelCase")]
2061pub struct S3SinkConfig {
2062    /// S3 bucket name
2063    #[serde(default)]
2064    #[validate(length(max = 63, message = "bucket name max 63 characters"))]
2065    pub bucket: Option<String>,
2066
2067    /// S3 region
2068    #[serde(default)]
2069    pub region: Option<String>,
2070
2071    /// S3 endpoint URL (for MinIO, LocalStack, etc.)
2072    #[serde(default)]
2073    pub endpoint_url: Option<String>,
2074
2075    /// Object key prefix
2076    #[serde(default)]
2077    pub prefix: Option<String>,
2078
2079    /// Output format: json, jsonl, parquet, avro
2080    #[serde(default)]
2081    #[validate(custom(function = "validate_output_format"))]
2082    pub format: Option<String>,
2083
2084    /// Compression: none, gzip, snappy, lz4, zstd
2085    #[serde(default)]
2086    #[validate(custom(function = "validate_s3_compression"))]
2087    pub compression: Option<String>,
2088
2089    /// Batch size (number of events per file)
2090    #[serde(default)]
2091    #[validate(range(min = 1, max = 1000000, message = "batch size must be 1-1000000"))]
2092    pub batch_size: Option<i64>,
2093
2094    /// Flush interval in seconds
2095    #[serde(default)]
2096    #[validate(range(
2097        min = 1,
2098        max = 86400,
2099        message = "flush interval must be 1-86400 seconds"
2100    ))]
2101    pub flush_interval_seconds: Option<i64>,
2102}
2103
2104#[allow(dead_code)]
2105fn validate_output_format(format: &str) -> Result<(), ValidationError> {
2106    match format {
2107        "" | "json" | "jsonl" | "parquet" | "avro" => Ok(()),
2108        _ => Err(ValidationError::new("invalid_output_format")
2109            .with_message("format must be: json, jsonl, parquet, or avro".into())),
2110    }
2111}
2112
2113#[allow(dead_code)]
2114fn validate_s3_compression(compression: &str) -> Result<(), ValidationError> {
2115    match compression {
2116        "" | "none" | "gzip" | "snappy" | "lz4" | "zstd" => Ok(()),
2117        _ => Err(ValidationError::new("invalid_compression")
2118            .with_message("compression must be: none, gzip, snappy, lz4, or zstd".into())),
2119    }
2120}
2121
2122/// HTTP sink specific configuration
2123#[allow(dead_code)]
2124#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2125#[serde(rename_all = "camelCase")]
2126pub struct HttpSinkConfig {
2127    /// Target URL for HTTP requests
2128    #[serde(default)]
2129    pub url: Option<String>,
2130
2131    /// HTTP method: POST, PUT, PATCH
2132    #[serde(default)]
2133    #[validate(custom(function = "validate_http_method"))]
2134    pub method: Option<String>,
2135
2136    /// Content type: application/json, application/x-ndjson
2137    #[serde(default)]
2138    pub content_type: Option<String>,
2139
2140    /// Request timeout in milliseconds
2141    #[serde(default)]
2142    #[validate(range(min = 100, max = 300000, message = "timeout must be 100-300000ms"))]
2143    pub timeout_ms: Option<i64>,
2144
2145    /// Batch size (events per request)
2146    #[serde(default)]
2147    #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2148    pub batch_size: Option<i64>,
2149}
2150
2151#[allow(dead_code)]
2152fn validate_http_method(method: &str) -> Result<(), ValidationError> {
2153    match method {
2154        "" | "POST" | "PUT" | "PATCH" => Ok(()),
2155        _ => Err(ValidationError::new("invalid_http_method")
2156            .with_message("HTTP method must be: POST, PUT, or PATCH".into())),
2157    }
2158}
2159
2160/// Stdout sink specific configuration (for debugging)
2161#[allow(dead_code)]
2162#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2163#[serde(rename_all = "camelCase")]
2164pub struct StdoutSinkConfig {
2165    /// Output format: json, pretty, raw
2166    #[serde(default)]
2167    pub format: Option<String>,
2168
2169    /// Pretty print JSON output
2170    #[serde(default)]
2171    pub pretty: Option<bool>,
2172
2173    /// Include metadata (topic, partition, offset)
2174    #[serde(default)]
2175    pub include_metadata: Option<bool>,
2176}
2177
2178// ============================================================================
2179// Typed Queue Sink Configurations (rivven-queue crate)
2180// ============================================================================
2181
2182/// Kafka sink specific configuration
2183#[allow(dead_code)]
2184#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2185#[serde(rename_all = "camelCase")]
2186pub struct KafkaSinkConfig {
2187    /// Kafka broker addresses
2188    #[serde(default)]
2189    pub brokers: Option<Vec<String>>,
2190
2191    /// Target Kafka topic
2192    #[serde(default)]
2193    pub topic: Option<String>,
2194
2195    /// Acks: none, leader, all
2196    #[serde(default)]
2197    #[validate(custom(function = "validate_kafka_acks"))]
2198    pub acks: Option<String>,
2199
2200    /// Compression: none, gzip, snappy, lz4, zstd
2201    #[serde(default)]
2202    pub compression: Option<String>,
2203
2204    /// Batch size in bytes
2205    #[serde(default)]
2206    #[validate(range(min = 0, max = 1048576, message = "batch size must be 0-1048576 bytes"))]
2207    pub batch_size: Option<i64>,
2208
2209    /// Linger time in milliseconds
2210    #[serde(default)]
2211    pub linger_ms: Option<i64>,
2212
2213    /// Security protocol: plaintext, ssl, sasl_plaintext, sasl_ssl
2214    #[serde(default)]
2215    pub security_protocol: Option<String>,
2216
2217    /// SASL mechanism: plain, scram-sha-256, scram-sha-512
2218    #[serde(default)]
2219    pub sasl_mechanism: Option<String>,
2220
2221    /// SASL username
2222    #[serde(default)]
2223    pub sasl_username: Option<String>,
2224}
2225
2226#[allow(dead_code)]
2227fn validate_kafka_acks(acks: &str) -> Result<(), ValidationError> {
2228    match acks {
2229        "" | "none" | "leader" | "all" | "0" | "1" | "-1" => Ok(()),
2230        _ => Err(ValidationError::new("invalid_kafka_acks")
2231            .with_message("acks must be: none, leader, all, 0, 1, or -1".into())),
2232    }
2233}
2234
2235// ============================================================================
2236// Typed Storage Sink Configurations (rivven-storage crate)
2237// ============================================================================
2238
2239/// GCS (Google Cloud Storage) sink specific configuration
2240#[allow(dead_code)]
2241#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2242#[serde(rename_all = "camelCase")]
2243pub struct GcsSinkConfig {
2244    /// GCS bucket name
2245    #[serde(default)]
2246    pub bucket: Option<String>,
2247
2248    /// Object key prefix
2249    #[serde(default)]
2250    pub prefix: Option<String>,
2251
2252    /// Output format: json, jsonl, parquet, avro
2253    #[serde(default)]
2254    pub format: Option<String>,
2255
2256    /// Compression: none, gzip
2257    #[serde(default)]
2258    pub compression: Option<String>,
2259
2260    /// Partitioning: none, daily, hourly
2261    #[serde(default)]
2262    pub partitioning: Option<String>,
2263
2264    /// Batch size (events per file)
2265    #[serde(default)]
2266    #[validate(range(min = 1, max = 1000000, message = "batch size must be 1-1000000"))]
2267    pub batch_size: Option<i64>,
2268
2269    /// Flush interval in seconds
2270    #[serde(default)]
2271    #[validate(range(
2272        min = 1,
2273        max = 86400,
2274        message = "flush interval must be 1-86400 seconds"
2275    ))]
2276    pub flush_interval_seconds: Option<i64>,
2277
2278    /// Path to service account credentials file
2279    #[serde(default)]
2280    pub credentials_file: Option<String>,
2281
2282    /// Use Application Default Credentials
2283    #[serde(default)]
2284    pub use_adc: Option<bool>,
2285}
2286
2287/// Azure Blob Storage sink specific configuration
2288#[allow(dead_code)]
2289#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2290#[serde(rename_all = "camelCase")]
2291pub struct AzureBlobSinkConfig {
2292    /// Storage account name
2293    #[serde(default)]
2294    pub account_name: Option<String>,
2295
2296    /// Container name
2297    #[serde(default)]
2298    pub container: Option<String>,
2299
2300    /// Blob prefix
2301    #[serde(default)]
2302    pub prefix: Option<String>,
2303
2304    /// Output format: json, jsonl, parquet, avro
2305    #[serde(default)]
2306    pub format: Option<String>,
2307
2308    /// Compression: none, gzip
2309    #[serde(default)]
2310    pub compression: Option<String>,
2311
2312    /// Partitioning: none, daily, hourly
2313    #[serde(default)]
2314    pub partitioning: Option<String>,
2315
2316    /// Batch size (events per file)
2317    #[serde(default)]
2318    #[validate(range(min = 1, max = 1000000, message = "batch size must be 1-1000000"))]
2319    pub batch_size: Option<i64>,
2320
2321    /// Flush interval in seconds
2322    #[serde(default)]
2323    #[validate(range(
2324        min = 1,
2325        max = 86400,
2326        message = "flush interval must be 1-86400 seconds"
2327    ))]
2328    pub flush_interval_seconds: Option<i64>,
2329}
2330
2331// ============================================================================
2332// Typed Warehouse Sink Configurations (rivven-warehouse crate)
2333// ============================================================================
2334
2335/// Snowflake sink specific configuration (Snowpipe Streaming)
2336#[allow(dead_code)]
2337#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2338#[serde(rename_all = "camelCase")]
2339pub struct SnowflakeSinkConfig {
2340    /// Snowflake account identifier (e.g., "myorg-account123")
2341    #[serde(default)]
2342    pub account: Option<String>,
2343
2344    /// Snowflake user name
2345    #[serde(default)]
2346    pub user: Option<String>,
2347
2348    /// Path to PKCS#8 private key file
2349    #[serde(default)]
2350    pub private_key_path: Option<String>,
2351
2352    /// Target database name
2353    #[serde(default)]
2354    pub database: Option<String>,
2355
2356    /// Target schema name
2357    #[serde(default)]
2358    pub schema: Option<String>,
2359
2360    /// Target table name
2361    #[serde(default)]
2362    pub table: Option<String>,
2363
2364    /// Snowflake warehouse name
2365    #[serde(default)]
2366    pub warehouse: Option<String>,
2367
2368    /// Snowflake role name
2369    #[serde(default)]
2370    pub role: Option<String>,
2371
2372    /// Batch size (events per insert)
2373    #[serde(default)]
2374    #[validate(range(min = 1, max = 100000, message = "batch size must be 1-100000"))]
2375    pub batch_size: Option<i64>,
2376}
2377
2378/// BigQuery sink specific configuration
2379#[allow(dead_code)]
2380#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2381#[serde(rename_all = "camelCase")]
2382pub struct BigQuerySinkConfig {
2383    /// GCP project ID
2384    #[serde(default)]
2385    pub project_id: Option<String>,
2386
2387    /// BigQuery dataset ID
2388    #[serde(default)]
2389    pub dataset_id: Option<String>,
2390
2391    /// BigQuery table ID
2392    #[serde(default)]
2393    pub table_id: Option<String>,
2394
2395    /// Path to service account credentials file
2396    #[serde(default)]
2397    pub credentials_file: Option<String>,
2398
2399    /// Use Application Default Credentials
2400    #[serde(default)]
2401    pub use_adc: Option<bool>,
2402
2403    /// Batch size (rows per insert request)
2404    #[serde(default)]
2405    #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2406    pub batch_size: Option<i64>,
2407
2408    /// Auto-create table if not exists
2409    #[serde(default)]
2410    pub auto_create_table: Option<bool>,
2411}
2412
2413/// Redshift sink specific configuration
2414#[allow(dead_code)]
2415#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2416#[serde(rename_all = "camelCase")]
2417pub struct RedshiftSinkConfig {
2418    /// Redshift cluster endpoint/host
2419    #[serde(default)]
2420    pub host: Option<String>,
2421
2422    /// Redshift port (default: 5439)
2423    #[serde(default)]
2424    #[validate(range(min = 1, max = 65535, message = "port must be 1-65535"))]
2425    pub port: Option<i32>,
2426
2427    /// Database name
2428    #[serde(default)]
2429    pub database: Option<String>,
2430
2431    /// Redshift user name
2432    #[serde(default)]
2433    pub user: Option<String>,
2434
2435    /// Target schema name
2436    #[serde(default)]
2437    pub schema: Option<String>,
2438
2439    /// Target table name
2440    #[serde(default)]
2441    pub table: Option<String>,
2442
2443    /// SSL mode: disable, prefer, require, verify-ca, verify-full
2444    #[serde(default)]
2445    #[validate(custom(function = "validate_redshift_ssl_mode"))]
2446    pub ssl_mode: Option<String>,
2447
2448    /// Batch size (rows per batch insert)
2449    #[serde(default)]
2450    #[validate(range(min = 1, max = 100000, message = "batch size must be 1-100000"))]
2451    pub batch_size: Option<i64>,
2452}
2453
2454#[allow(dead_code)]
2455fn validate_redshift_ssl_mode(mode: &str) -> Result<(), ValidationError> {
2456    match mode {
2457        "" | "disable" | "prefer" | "require" | "verify-ca" | "verify-full" => Ok(()),
2458        _ => Err(ValidationError::new("invalid_ssl_mode").with_message(
2459            "SSL mode must be: disable, prefer, require, verify-ca, or verify-full".into(),
2460        )),
2461    }
2462}
2463
2464/// Source topic configuration
2465#[allow(dead_code)]
2466#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2467#[serde(rename_all = "camelCase")]
2468pub struct SourceTopicConfigSpec {
2469    /// Number of partitions for auto-created topics
2470    #[serde(default)]
2471    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
2472    pub partitions: Option<i32>,
2473
2474    /// Replication factor for auto-created topics
2475    #[serde(default)]
2476    #[validate(range(
2477        min = 1,
2478        max = 10,
2479        message = "replication factor must be between 1 and 10"
2480    ))]
2481    pub replication_factor: Option<i32>,
2482
2483    /// Auto-create topics if they don't exist
2484    #[serde(default)]
2485    pub auto_create: Option<bool>,
2486}
2487
2488/// Sink connector specification
2489#[allow(dead_code)]
2490#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2491#[serde(rename_all = "camelCase")]
2492pub struct SinkConnectorSpec {
2493    /// Unique name for this sink connector
2494    #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
2495    #[validate(custom(function = "validate_k8s_name"))]
2496    pub name: String,
2497
2498    /// Connector type (stdout, s3, http, elasticsearch, etc.)
2499    #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
2500    #[validate(custom(function = "validate_connector_type"))]
2501    pub connector: String,
2502
2503    /// Topics to consume from (supports wildcards like "cdc.*")
2504    #[validate(length(min = 1, max = 100, message = "must have 1-100 topics"))]
2505    pub topics: Vec<String>,
2506
2507    /// Consumer group for offset tracking
2508    #[validate(length(
2509        min = 1,
2510        max = 128,
2511        message = "consumer group must be 1-128 characters"
2512    ))]
2513    pub consumer_group: String,
2514
2515    /// Whether this sink is enabled
2516    #[serde(default = "default_true")]
2517    pub enabled: bool,
2518
2519    /// Starting offset (earliest, latest, or timestamp)
2520    #[serde(default = "default_start_offset")]
2521    #[validate(custom(function = "validate_start_offset"))]
2522    pub start_offset: String,
2523
2524    // ========================================================================
2525    // Typed Connector Configurations (mutually exclusive, validated at runtime)
2526    // Use these for built-in connectors to get validation and discoverability.
2527    // ========================================================================
2528    /// S3 sink specific configuration
2529    #[serde(default)]
2530    #[validate(nested)]
2531    pub s3: Option<S3SinkConfig>,
2532
2533    /// HTTP sink specific configuration
2534    #[serde(default)]
2535    #[validate(nested)]
2536    pub http: Option<HttpSinkConfig>,
2537
2538    /// Stdout sink specific configuration
2539    #[serde(default)]
2540    #[validate(nested)]
2541    pub stdout: Option<StdoutSinkConfig>,
2542
2543    /// Kafka sink specific configuration (rivven-queue)
2544    #[serde(default)]
2545    #[validate(nested)]
2546    pub kafka: Option<KafkaSinkConfig>,
2547
2548    /// GCS sink specific configuration (rivven-storage)
2549    #[serde(default)]
2550    #[validate(nested)]
2551    pub gcs: Option<GcsSinkConfig>,
2552
2553    /// Azure Blob sink specific configuration (rivven-storage)
2554    #[serde(default)]
2555    #[validate(nested)]
2556    pub azure_blob: Option<AzureBlobSinkConfig>,
2557
2558    /// Snowflake sink specific configuration (rivven-warehouse)
2559    #[serde(default)]
2560    #[validate(nested)]
2561    pub snowflake: Option<SnowflakeSinkConfig>,
2562
2563    /// BigQuery sink specific configuration (rivven-warehouse)
2564    #[serde(default)]
2565    #[validate(nested)]
2566    pub bigquery: Option<BigQuerySinkConfig>,
2567
2568    /// Redshift sink specific configuration (rivven-warehouse)
2569    #[serde(default)]
2570    #[validate(nested)]
2571    pub redshift: Option<RedshiftSinkConfig>,
2572
2573    // ========================================================================
2574    // Generic Configuration (for custom connectors or advanced overrides)
2575    // ========================================================================
2576    /// Generic connector configuration (for custom connectors)
2577    /// Use typed fields above for built-in connectors when possible.
2578    #[serde(default)]
2579    pub config: serde_json::Value,
2580
2581    /// Secret reference for sensitive configuration
2582    #[serde(default)]
2583    #[validate(custom(function = "validate_optional_k8s_name"))]
2584    pub config_secret_ref: Option<String>,
2585
2586    /// Rate limiting configuration
2587    #[serde(default)]
2588    #[validate(nested)]
2589    pub rate_limit: RateLimitSpec,
2590}
2591
2592#[allow(dead_code)]
2593fn default_start_offset() -> String {
2594    "latest".to_string()
2595}
2596
2597#[allow(dead_code)]
2598fn validate_start_offset(offset: &str) -> Result<(), ValidationError> {
2599    match offset {
2600        "earliest" | "latest" => Ok(()),
2601        s if s.contains('T') && s.contains(':') => Ok(()), // ISO 8601 timestamp
2602        _ => Err(ValidationError::new("invalid_start_offset").with_message(
2603            "start offset must be 'earliest', 'latest', or ISO 8601 timestamp".into(),
2604        )),
2605    }
2606}
2607
2608/// Rate limiting configuration for sinks
2609#[allow(dead_code)]
2610#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2611#[serde(rename_all = "camelCase")]
2612pub struct RateLimitSpec {
2613    /// Maximum events per second (0 = unlimited)
2614    #[serde(default)]
2615    #[validate(range(
2616        min = 0,
2617        max = 1_000_000,
2618        message = "events per second must be 0-1000000"
2619    ))]
2620    pub events_per_second: u64,
2621
2622    /// Burst capacity (extra events above steady rate)
2623    #[serde(default)]
2624    #[validate(range(min = 0, max = 100_000, message = "burst capacity must be 0-100000"))]
2625    pub burst_capacity: Option<u64>,
2626}
2627
2628/// Global settings for all connectors
2629#[allow(dead_code)]
2630#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2631#[serde(rename_all = "camelCase")]
2632pub struct GlobalConnectSettings {
2633    /// Topic auto-creation settings
2634    #[serde(default)]
2635    #[validate(nested)]
2636    pub topic: TopicSettingsSpec,
2637
2638    /// Retry configuration
2639    #[serde(default)]
2640    #[validate(nested)]
2641    pub retry: RetryConfigSpec,
2642
2643    /// Health check configuration
2644    #[serde(default)]
2645    #[validate(nested)]
2646    pub health: HealthConfigSpec,
2647
2648    /// Metrics configuration
2649    #[serde(default)]
2650    #[validate(nested)]
2651    pub metrics: ConnectMetricsSpec,
2652}
2653
2654/// Topic settings for auto-creation
2655#[allow(dead_code)]
2656#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2657#[serde(rename_all = "camelCase")]
2658pub struct TopicSettingsSpec {
2659    /// Enable automatic topic creation
2660    #[serde(default = "default_true")]
2661    pub auto_create: bool,
2662
2663    /// Default partitions for new topics
2664    #[serde(default = "default_topic_partitions")]
2665    #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
2666    pub default_partitions: i32,
2667
2668    /// Default replication factor
2669    #[serde(default = "default_topic_replication")]
2670    #[validate(range(
2671        min = 1,
2672        max = 10,
2673        message = "replication factor must be between 1 and 10"
2674    ))]
2675    pub default_replication_factor: i32,
2676
2677    /// Fail if topic doesn't exist and auto_create is false
2678    #[serde(default = "default_true")]
2679    pub require_topic_exists: bool,
2680}
2681
2682#[allow(dead_code)]
2683fn default_topic_partitions() -> i32 {
2684    1
2685}
2686
2687#[allow(dead_code)]
2688fn default_topic_replication() -> i32 {
2689    1
2690}
2691
2692impl Default for TopicSettingsSpec {
2693    fn default() -> Self {
2694        Self {
2695            auto_create: true,
2696            default_partitions: 1,
2697            default_replication_factor: 1,
2698            require_topic_exists: true,
2699        }
2700    }
2701}
2702
2703/// Retry configuration
2704#[allow(dead_code)]
2705#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2706#[serde(rename_all = "camelCase")]
2707pub struct RetryConfigSpec {
2708    /// Maximum retry attempts
2709    #[serde(default = "default_max_retries")]
2710    #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
2711    pub max_retries: i32,
2712
2713    /// Initial backoff in milliseconds
2714    #[serde(default = "default_initial_backoff_ms")]
2715    #[validate(range(min = 10, max = 60000, message = "initial backoff must be 10-60000ms"))]
2716    pub initial_backoff_ms: i64,
2717
2718    /// Maximum backoff in milliseconds
2719    #[serde(default = "default_max_backoff_ms")]
2720    #[validate(range(
2721        min = 100,
2722        max = 3600000,
2723        message = "max backoff must be 100-3600000ms"
2724    ))]
2725    pub max_backoff_ms: i64,
2726
2727    /// Backoff multiplier
2728    #[serde(default = "default_backoff_multiplier")]
2729    pub backoff_multiplier: f64,
2730}
2731
2732#[allow(dead_code)]
2733fn default_max_retries() -> i32 {
2734    10
2735}
2736
2737#[allow(dead_code)]
2738fn default_initial_backoff_ms() -> i64 {
2739    100
2740}
2741
2742#[allow(dead_code)]
2743fn default_max_backoff_ms() -> i64 {
2744    30000
2745}
2746
2747#[allow(dead_code)]
2748fn default_backoff_multiplier() -> f64 {
2749    2.0
2750}
2751
2752impl Default for RetryConfigSpec {
2753    fn default() -> Self {
2754        Self {
2755            max_retries: 10,
2756            initial_backoff_ms: 100,
2757            max_backoff_ms: 30000,
2758            backoff_multiplier: 2.0,
2759        }
2760    }
2761}
2762
2763/// Health check configuration
2764#[allow(dead_code)]
2765#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2766#[serde(rename_all = "camelCase")]
2767pub struct HealthConfigSpec {
2768    /// Enable health check HTTP endpoint
2769    #[serde(default)]
2770    pub enabled: bool,
2771
2772    /// Health check port
2773    #[serde(default = "default_health_port")]
2774    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
2775    pub port: i32,
2776
2777    /// Health check path
2778    #[serde(default = "default_health_path")]
2779    pub path: String,
2780}
2781
2782#[allow(dead_code)]
2783fn default_health_port() -> i32 {
2784    8080
2785}
2786
2787#[allow(dead_code)]
2788fn default_health_path() -> String {
2789    "/health".to_string()
2790}
2791
2792impl Default for HealthConfigSpec {
2793    fn default() -> Self {
2794        Self {
2795            enabled: false,
2796            port: 8080,
2797            path: "/health".to_string(),
2798        }
2799    }
2800}
2801
2802/// Metrics configuration for connect
2803#[allow(dead_code)]
2804#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2805#[serde(rename_all = "camelCase")]
2806pub struct ConnectMetricsSpec {
2807    /// Enable metrics endpoint
2808    #[serde(default)]
2809    pub enabled: bool,
2810
2811    /// Metrics port
2812    #[serde(default = "default_connect_metrics_port")]
2813    #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
2814    pub port: i32,
2815}
2816
2817#[allow(dead_code)]
2818fn default_connect_metrics_port() -> i32 {
2819    9091
2820}
2821
2822impl Default for ConnectMetricsSpec {
2823    fn default() -> Self {
2824        Self {
2825            enabled: false,
2826            port: 9091,
2827        }
2828    }
2829}
2830
2831/// TLS configuration for connect broker connection
2832#[allow(dead_code)]
2833#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2834#[serde(rename_all = "camelCase")]
2835pub struct ConnectTlsSpec {
2836    /// Enable TLS for broker connection
2837    #[serde(default)]
2838    pub enabled: bool,
2839
2840    /// Secret containing TLS certificates
2841    #[serde(default)]
2842    #[validate(custom(function = "validate_optional_k8s_name"))]
2843    pub cert_secret_name: Option<String>,
2844
2845    /// Enable mTLS (mutual TLS)
2846    #[serde(default)]
2847    pub mtls_enabled: bool,
2848
2849    /// CA secret name for mTLS
2850    #[serde(default)]
2851    #[validate(custom(function = "validate_optional_k8s_name"))]
2852    pub ca_secret_name: Option<String>,
2853
2854    /// Skip server certificate verification (DANGEROUS - testing only)
2855    #[serde(default)]
2856    pub insecure: bool,
2857}
2858
2859#[allow(dead_code)]
2860fn default_connect_replicas() -> i32 {
2861    1
2862}
2863
2864/// Status of a RivvenConnect resource
2865#[allow(dead_code)]
2866#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
2867#[serde(rename_all = "camelCase")]
2868pub struct RivvenConnectStatus {
2869    /// Current phase of the connect instance
2870    pub phase: ConnectPhase,
2871
2872    /// Total replicas
2873    pub replicas: i32,
2874
2875    /// Ready replicas
2876    pub ready_replicas: i32,
2877
2878    /// Number of source connectors running
2879    pub sources_running: i32,
2880
2881    /// Number of sink connectors running
2882    pub sinks_running: i32,
2883
2884    /// Total number of sources configured
2885    pub sources_total: i32,
2886
2887    /// Total number of sinks configured
2888    pub sinks_total: i32,
2889
2890    /// Current observed generation
2891    pub observed_generation: i64,
2892
2893    /// Conditions describing connect state
2894    #[serde(default)]
2895    pub conditions: Vec<ConnectCondition>,
2896
2897    /// Individual connector statuses
2898    #[serde(default)]
2899    pub connector_statuses: Vec<ConnectorStatus>,
2900
2901    /// Last time the status was updated
2902    pub last_updated: Option<String>,
2903
2904    /// Error message if any
2905    pub message: Option<String>,
2906}
2907
2908/// Phase of the connect lifecycle
2909#[allow(dead_code)]
2910#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
2911pub enum ConnectPhase {
2912    /// Connect is being created
2913    #[default]
2914    Pending,
2915    /// Connect is starting up
2916    Starting,
2917    /// Connect is running and healthy
2918    Running,
2919    /// Connect is partially healthy (some connectors failed)
2920    Degraded,
2921    /// Connect has failed
2922    Failed,
2923    /// Connect is being deleted
2924    Terminating,
2925}
2926
2927/// Condition describing an aspect of connect state
2928#[allow(dead_code)]
2929#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2930#[serde(rename_all = "camelCase")]
2931pub struct ConnectCondition {
2932    /// Type of condition (Ready, BrokerConnected, SourcesHealthy, SinksHealthy)
2933    #[serde(rename = "type")]
2934    pub condition_type: String,
2935
2936    /// Status of the condition (True, False, Unknown)
2937    pub status: String,
2938
2939    /// Reason for the condition
2940    pub reason: Option<String>,
2941
2942    /// Human-readable message
2943    pub message: Option<String>,
2944
2945    /// Last transition time
2946    pub last_transition_time: Option<String>,
2947}
2948
2949/// Status of an individual connector
2950#[allow(dead_code)]
2951#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2952#[serde(rename_all = "camelCase")]
2953pub struct ConnectorStatus {
2954    /// Connector name
2955    pub name: String,
2956
2957    /// Connector type (source or sink)
2958    pub connector_type: String,
2959
2960    /// Connector kind (postgres-cdc, stdout, etc.)
2961    pub kind: String,
2962
2963    /// Current state (running, stopped, failed)
2964    pub state: String,
2965
2966    /// Number of events processed
2967    pub events_processed: i64,
2968
2969    /// Last error message
2970    pub last_error: Option<String>,
2971
2972    /// Last successful operation time
2973    pub last_success_time: Option<String>,
2974}
2975
2976#[allow(dead_code)]
2977impl RivvenConnectSpec {
2978    /// Get the full container image including version
2979    pub fn get_image(&self) -> String {
2980        if let Some(ref image) = self.image {
2981            image.clone()
2982        } else {
2983            format!("ghcr.io/hupe1980/rivven-connect:{}", self.version)
2984        }
2985    }
2986
2987    /// Get labels for managed resources
2988    pub fn get_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2989        let mut labels = BTreeMap::new();
2990        labels.insert(
2991            "app.kubernetes.io/name".to_string(),
2992            "rivven-connect".to_string(),
2993        );
2994        labels.insert(
2995            "app.kubernetes.io/instance".to_string(),
2996            connect_name.to_string(),
2997        );
2998        labels.insert(
2999            "app.kubernetes.io/component".to_string(),
3000            "connector".to_string(),
3001        );
3002        labels.insert(
3003            "app.kubernetes.io/managed-by".to_string(),
3004            "rivven-operator".to_string(),
3005        );
3006        labels.insert(
3007            "app.kubernetes.io/version".to_string(),
3008            self.version.clone(),
3009        );
3010        labels
3011    }
3012
3013    /// Get selector labels for managed resources
3014    pub fn get_selector_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
3015        let mut labels = BTreeMap::new();
3016        labels.insert(
3017            "app.kubernetes.io/name".to_string(),
3018            "rivven-connect".to_string(),
3019        );
3020        labels.insert(
3021            "app.kubernetes.io/instance".to_string(),
3022            connect_name.to_string(),
3023        );
3024        labels
3025    }
3026
3027    /// Get enabled sources count
3028    pub fn enabled_sources_count(&self) -> usize {
3029        self.sources.iter().filter(|s| s.enabled).count()
3030    }
3031
3032    /// Get enabled sinks count
3033    pub fn enabled_sinks_count(&self) -> usize {
3034        self.sinks.iter().filter(|s| s.enabled).count()
3035    }
3036}
3037
3038#[cfg(test)]
3039mod tests {
3040    use super::*;
3041
3042    #[test]
3043    fn test_default_spec() {
3044        let spec = RivvenClusterSpec {
3045            replicas: 3,
3046            version: "0.0.1".to_string(),
3047            image: None,
3048            image_pull_policy: "IfNotPresent".to_string(),
3049            image_pull_secrets: vec![],
3050            storage: StorageSpec::default(),
3051            resources: None,
3052            config: BrokerConfig::default(),
3053            tls: TlsSpec::default(),
3054            metrics: MetricsSpec::default(),
3055            affinity: None,
3056            node_selector: BTreeMap::new(),
3057            tolerations: vec![],
3058            pod_disruption_budget: PdbSpec::default(),
3059            service_account: None,
3060            pod_annotations: BTreeMap::new(),
3061            pod_labels: BTreeMap::new(),
3062            env: vec![],
3063            liveness_probe: ProbeSpec::default(),
3064            readiness_probe: ProbeSpec::default(),
3065            security_context: None,
3066            container_security_context: None,
3067        };
3068
3069        assert_eq!(spec.replicas, 3);
3070        assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven:0.0.1");
3071    }
3072
3073    #[test]
3074    fn test_get_labels() {
3075        let spec = RivvenClusterSpec {
3076            replicas: 3,
3077            version: "0.0.1".to_string(),
3078            image: None,
3079            image_pull_policy: "IfNotPresent".to_string(),
3080            image_pull_secrets: vec![],
3081            storage: StorageSpec::default(),
3082            resources: None,
3083            config: BrokerConfig::default(),
3084            tls: TlsSpec::default(),
3085            metrics: MetricsSpec::default(),
3086            affinity: None,
3087            node_selector: BTreeMap::new(),
3088            tolerations: vec![],
3089            pod_disruption_budget: PdbSpec::default(),
3090            service_account: None,
3091            pod_annotations: BTreeMap::new(),
3092            pod_labels: BTreeMap::new(),
3093            env: vec![],
3094            liveness_probe: ProbeSpec::default(),
3095            readiness_probe: ProbeSpec::default(),
3096            security_context: None,
3097            container_security_context: None,
3098        };
3099
3100        let labels = spec.get_labels("my-cluster");
3101        assert_eq!(
3102            labels.get("app.kubernetes.io/name"),
3103            Some(&"rivven".to_string())
3104        );
3105        assert_eq!(
3106            labels.get("app.kubernetes.io/instance"),
3107            Some(&"my-cluster".to_string())
3108        );
3109    }
3110
3111    #[test]
3112    fn test_custom_image() {
3113        let spec = RivvenClusterSpec {
3114            replicas: 1,
3115            version: "0.0.1".to_string(),
3116            image: Some("my-registry/rivven:custom".to_string()),
3117            image_pull_policy: "Always".to_string(),
3118            image_pull_secrets: vec![],
3119            storage: StorageSpec::default(),
3120            resources: None,
3121            config: BrokerConfig::default(),
3122            tls: TlsSpec::default(),
3123            metrics: MetricsSpec::default(),
3124            affinity: None,
3125            node_selector: BTreeMap::new(),
3126            tolerations: vec![],
3127            pod_disruption_budget: PdbSpec::default(),
3128            service_account: None,
3129            pod_annotations: BTreeMap::new(),
3130            pod_labels: BTreeMap::new(),
3131            env: vec![],
3132            liveness_probe: ProbeSpec::default(),
3133            readiness_probe: ProbeSpec::default(),
3134            security_context: None,
3135            container_security_context: None,
3136        };
3137
3138        assert_eq!(spec.get_image(), "my-registry/rivven:custom");
3139    }
3140
3141    #[test]
3142    fn test_cluster_phase_default() {
3143        let phase = ClusterPhase::default();
3144        assert_eq!(phase, ClusterPhase::Pending);
3145    }
3146
3147    #[test]
3148    fn test_storage_spec_default() {
3149        let storage = StorageSpec::default();
3150        assert_eq!(storage.size, "10Gi");
3151        assert!(storage.storage_class_name.is_none());
3152    }
3153
3154    #[test]
3155    fn test_broker_config_defaults() {
3156        let config = BrokerConfig::default();
3157        assert_eq!(config.default_partitions, 3);
3158        assert_eq!(config.default_replication_factor, 2);
3159        assert!(config.auto_create_topics);
3160    }
3161
3162    #[test]
3163    fn test_probe_spec_defaults() {
3164        let probe = ProbeSpec::default();
3165        assert!(probe.enabled);
3166        assert_eq!(probe.initial_delay_seconds, 30);
3167        assert_eq!(probe.period_seconds, 10);
3168    }
3169
3170    #[test]
3171    fn test_validate_quantity_valid() {
3172        assert!(validate_quantity("10Gi").is_ok());
3173        assert!(validate_quantity("100Mi").is_ok());
3174        assert!(validate_quantity("1Ti").is_ok());
3175        assert!(validate_quantity("500").is_ok());
3176        assert!(validate_quantity("1.5Gi").is_ok());
3177    }
3178
3179    #[test]
3180    fn test_validate_quantity_invalid() {
3181        assert!(validate_quantity("10GB").is_err()); // Wrong suffix
3182        assert!(validate_quantity("abc").is_err()); // Not a number
3183        assert!(validate_quantity("-10Gi").is_err()); // Negative
3184        assert!(validate_quantity("").is_err()); // Empty
3185    }
3186
3187    #[test]
3188    fn test_validate_k8s_name_valid() {
3189        assert!(validate_k8s_name("my-cluster").is_ok());
3190        assert!(validate_k8s_name("cluster123").is_ok());
3191        assert!(validate_k8s_name("a").is_ok());
3192    }
3193
3194    #[test]
3195    fn test_validate_k8s_name_invalid() {
3196        assert!(validate_k8s_name("My-Cluster").is_err()); // Uppercase
3197        assert!(validate_k8s_name("-cluster").is_err()); // Starts with dash
3198        assert!(validate_k8s_name("cluster-").is_err()); // Ends with dash
3199        assert!(validate_k8s_name("cluster_name").is_err()); // Underscore
3200    }
3201
3202    #[test]
3203    fn test_validate_compression_type() {
3204        assert!(validate_compression_type("lz4").is_ok());
3205        assert!(validate_compression_type("zstd").is_ok());
3206        assert!(validate_compression_type("none").is_ok());
3207        assert!(validate_compression_type("invalid").is_err());
3208    }
3209
3210    #[test]
3211    fn test_validate_segment_size() {
3212        assert!(validate_segment_size(1_048_576).is_ok()); // 1MB - minimum
3213        assert!(validate_segment_size(10_737_418_240).is_ok()); // 10GB - maximum
3214        assert!(validate_segment_size(1_073_741_824).is_ok()); // 1GB - valid
3215        assert!(validate_segment_size(1_000).is_err()); // Too small
3216        assert!(validate_segment_size(20_000_000_000).is_err()); // Too large
3217    }
3218
3219    #[test]
3220    fn test_validate_message_size() {
3221        assert!(validate_message_size(1_024).is_ok()); // 1KB - minimum
3222        assert!(validate_message_size(104_857_600).is_ok()); // 100MB - maximum
3223        assert!(validate_message_size(1_048_576).is_ok()); // 1MB - valid
3224        assert!(validate_message_size(100).is_err()); // Too small
3225        assert!(validate_message_size(200_000_000).is_err()); // Too large
3226    }
3227
3228    #[test]
3229    fn test_validate_pull_policy() {
3230        assert!(validate_pull_policy("Always").is_ok());
3231        assert!(validate_pull_policy("IfNotPresent").is_ok());
3232        assert!(validate_pull_policy("Never").is_ok());
3233        assert!(validate_pull_policy("always").is_err()); // Wrong case
3234        assert!(validate_pull_policy("Invalid").is_err());
3235    }
3236
3237    #[test]
3238    fn test_validate_duration() {
3239        assert!(validate_duration("30s").is_ok());
3240        assert!(validate_duration("1m").is_ok());
3241        assert!(validate_duration("5m30s").is_ok());
3242        assert!(validate_duration("1h").is_ok());
3243        assert!(validate_duration("invalid").is_err());
3244        assert!(validate_duration("30").is_err()); // Missing unit
3245    }
3246
3247    #[test]
3248    fn test_validate_access_modes() {
3249        assert!(validate_access_modes(&["ReadWriteOnce".to_string()]).is_ok());
3250        assert!(
3251            validate_access_modes(&["ReadWriteOnce".to_string(), "ReadOnlyMany".to_string()])
3252                .is_ok()
3253        );
3254        assert!(validate_access_modes(&["Invalid".to_string()]).is_err());
3255    }
3256
3257    // RivvenConnect CRD tests
3258    #[test]
3259    fn test_connect_spec_defaults() {
3260        let spec = RivvenConnectSpec {
3261            cluster_ref: ClusterReference {
3262                name: "my-cluster".to_string(),
3263                namespace: None,
3264            },
3265            replicas: 1,
3266            version: "0.0.1".to_string(),
3267            image: None,
3268            image_pull_policy: "IfNotPresent".to_string(),
3269            image_pull_secrets: vec![],
3270            resources: None,
3271            config: ConnectConfigSpec::default(),
3272            sources: vec![],
3273            sinks: vec![],
3274            settings: GlobalConnectSettings::default(),
3275            tls: ConnectTlsSpec::default(),
3276            pod_annotations: BTreeMap::new(),
3277            pod_labels: BTreeMap::new(),
3278            env: vec![],
3279            node_selector: BTreeMap::new(),
3280            tolerations: vec![],
3281            affinity: None,
3282            service_account: None,
3283            security_context: None,
3284            container_security_context: None,
3285        };
3286        assert_eq!(spec.replicas, 1);
3287    }
3288
3289    #[test]
3290    fn test_connect_phase_default() {
3291        let phase = ConnectPhase::default();
3292        assert_eq!(phase, ConnectPhase::Pending);
3293    }
3294
3295    #[test]
3296    fn test_validate_connector_type() {
3297        assert!(validate_connector_type("postgres-cdc").is_ok());
3298        assert!(validate_connector_type("mysql-cdc").is_ok());
3299        assert!(validate_connector_type("http").is_ok());
3300        assert!(validate_connector_type("stdout").is_ok());
3301        assert!(validate_connector_type("s3").is_ok());
3302        assert!(validate_connector_type("datagen").is_ok());
3303        assert!(validate_connector_type("custom-connector").is_ok());
3304    }
3305
3306    #[test]
3307    fn test_validate_start_offset() {
3308        assert!(validate_start_offset("earliest").is_ok());
3309        assert!(validate_start_offset("latest").is_ok());
3310        assert!(validate_start_offset("2024-01-01T00:00:00Z").is_ok());
3311        assert!(validate_start_offset("invalid").is_err());
3312    }
3313
3314    #[test]
3315    fn test_validate_image_valid() {
3316        assert!(validate_image("nginx").is_ok());
3317        assert!(validate_image("nginx:latest").is_ok());
3318        assert!(validate_image("ghcr.io/hupe1980/rivven:0.0.1").is_ok());
3319        assert!(validate_image("my-registry.io:5000/image:tag").is_ok());
3320        assert!(validate_image("localhost:5000/myimage").is_ok());
3321        assert!(validate_image("").is_ok()); // Empty allowed, uses default
3322    }
3323
3324    #[test]
3325    fn test_validate_image_invalid() {
3326        assert!(validate_image("/absolute/path").is_err()); // Starts with /
3327        assert!(validate_image("-invalid").is_err()); // Starts with -
3328        assert!(validate_image("image..path").is_err()); // Contains ..
3329                                                         // Very long image name
3330        let long_name = "a".repeat(300);
3331        assert!(validate_image(&long_name).is_err());
3332    }
3333
3334    #[test]
3335    fn test_validate_node_selector() {
3336        let mut selectors = BTreeMap::new();
3337        selectors.insert("node-type".to_string(), "compute".to_string());
3338        assert!(validate_node_selector(&selectors).is_ok());
3339
3340        // Too many selectors
3341        let mut many = BTreeMap::new();
3342        for i in 0..25 {
3343            many.insert(format!("key-{}", i), "value".to_string());
3344        }
3345        assert!(validate_node_selector(&many).is_err());
3346    }
3347
3348    #[test]
3349    fn test_validate_annotations() {
3350        let mut annotations = BTreeMap::new();
3351        annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
3352        assert!(validate_annotations(&annotations).is_ok());
3353
3354        // Too many annotations
3355        let mut many = BTreeMap::new();
3356        for i in 0..55 {
3357            many.insert(format!("annotation-{}", i), "value".to_string());
3358        }
3359        assert!(validate_annotations(&many).is_err());
3360    }
3361
3362    #[test]
3363    fn test_validate_labels() {
3364        let mut labels = BTreeMap::new();
3365        labels.insert("team".to_string(), "platform".to_string());
3366        assert!(validate_labels(&labels).is_ok());
3367
3368        // Reserved prefix
3369        let mut reserved = BTreeMap::new();
3370        reserved.insert("app.kubernetes.io/custom".to_string(), "value".to_string());
3371        assert!(validate_labels(&reserved).is_err());
3372    }
3373
3374    #[test]
3375    fn test_validate_raw_config() {
3376        let mut config = BTreeMap::new();
3377        config.insert("custom.setting".to_string(), "value".to_string());
3378        assert!(validate_raw_config(&config).is_ok());
3379
3380        // Forbidden key
3381        let mut forbidden = BTreeMap::new();
3382        forbidden.insert("command".to_string(), "/bin/sh".to_string());
3383        assert!(validate_raw_config(&forbidden).is_err());
3384
3385        // Too many entries
3386        let mut many = BTreeMap::new();
3387        for i in 0..55 {
3388            many.insert(format!("config-{}", i), "value".to_string());
3389        }
3390        assert!(validate_raw_config(&many).is_err());
3391    }
3392
3393    #[test]
3394    fn test_validate_int_or_percent() {
3395        assert!(validate_optional_int_or_percent("1").is_ok());
3396        assert!(validate_optional_int_or_percent("25%").is_ok());
3397        assert!(validate_optional_int_or_percent("100%").is_ok());
3398        assert!(validate_optional_int_or_percent("").is_ok()); // Empty allowed
3399        assert!(validate_optional_int_or_percent("abc").is_err());
3400        assert!(validate_optional_int_or_percent("25%%").is_err());
3401    }
3402
3403    #[test]
3404    fn test_tls_spec_default() {
3405        let tls = TlsSpec::default();
3406        assert!(!tls.enabled);
3407        assert!(tls.cert_secret_name.is_none());
3408        assert!(!tls.mtls_enabled);
3409    }
3410
3411    #[test]
3412    fn test_metrics_spec_default() {
3413        let metrics = MetricsSpec::default();
3414        assert!(metrics.enabled);
3415        assert_eq!(metrics.port, 9090);
3416    }
3417
3418    #[test]
3419    fn test_pdb_spec_default() {
3420        let pdb = PdbSpec::default();
3421        assert!(pdb.enabled);
3422        assert!(pdb.min_available.is_none());
3423        assert_eq!(pdb.max_unavailable, Some("1".to_string()));
3424    }
3425
3426    #[test]
3427    fn test_service_monitor_labels() {
3428        let mut labels = BTreeMap::new();
3429        labels.insert("release".to_string(), "prometheus".to_string());
3430        assert!(validate_service_monitor_labels(&labels).is_ok());
3431
3432        // Too many labels
3433        let mut many = BTreeMap::new();
3434        for i in 0..15 {
3435            many.insert(format!("label-{}", i), "value".to_string());
3436        }
3437        assert!(validate_service_monitor_labels(&many).is_err());
3438    }
3439
3440    #[test]
3441    fn test_cluster_condition_time_format() {
3442        let condition = ClusterCondition {
3443            condition_type: "Ready".to_string(),
3444            status: "True".to_string(),
3445            last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
3446            reason: Some("AllReplicasReady".to_string()),
3447            message: Some("All replicas are ready".to_string()),
3448        };
3449        assert!(condition.last_transition_time.unwrap().contains('T'));
3450    }
3451
3452    // ========================================================================
3453    // Typed Connector Config Tests
3454    // ========================================================================
3455
3456    #[test]
3457    fn test_validate_snapshot_mode() {
3458        assert!(validate_snapshot_mode("initial").is_ok());
3459        assert!(validate_snapshot_mode("never").is_ok());
3460        assert!(validate_snapshot_mode("when_needed").is_ok());
3461        assert!(validate_snapshot_mode("exported").is_ok());
3462        assert!(validate_snapshot_mode("custom").is_ok());
3463        assert!(validate_snapshot_mode("").is_ok()); // Empty allowed
3464        assert!(validate_snapshot_mode("invalid").is_err());
3465    }
3466
3467    #[test]
3468    fn test_validate_decoding_plugin() {
3469        assert!(validate_decoding_plugin("pgoutput").is_ok());
3470        assert!(validate_decoding_plugin("wal2json").is_ok());
3471        assert!(validate_decoding_plugin("decoderbufs").is_ok());
3472        assert!(validate_decoding_plugin("").is_ok()); // Empty allowed
3473        assert!(validate_decoding_plugin("invalid").is_err());
3474    }
3475
3476    #[test]
3477    fn test_validate_mysql_snapshot_mode() {
3478        assert!(validate_mysql_snapshot_mode("initial").is_ok());
3479        assert!(validate_mysql_snapshot_mode("never").is_ok());
3480        assert!(validate_mysql_snapshot_mode("when_needed").is_ok());
3481        assert!(validate_mysql_snapshot_mode("schema_only").is_ok());
3482        assert!(validate_mysql_snapshot_mode("").is_ok());
3483        assert!(validate_mysql_snapshot_mode("invalid").is_err());
3484    }
3485
3486    #[test]
3487    fn test_validate_output_format() {
3488        assert!(validate_output_format("json").is_ok());
3489        assert!(validate_output_format("jsonl").is_ok());
3490        assert!(validate_output_format("parquet").is_ok());
3491        assert!(validate_output_format("avro").is_ok());
3492        assert!(validate_output_format("").is_ok());
3493        assert!(validate_output_format("xml").is_err());
3494    }
3495
3496    #[test]
3497    fn test_validate_s3_compression() {
3498        assert!(validate_s3_compression("none").is_ok());
3499        assert!(validate_s3_compression("gzip").is_ok());
3500        assert!(validate_s3_compression("snappy").is_ok());
3501        assert!(validate_s3_compression("lz4").is_ok());
3502        assert!(validate_s3_compression("zstd").is_ok());
3503        assert!(validate_s3_compression("").is_ok());
3504        assert!(validate_s3_compression("bzip2").is_err());
3505    }
3506
3507    #[test]
3508    fn test_validate_http_method() {
3509        assert!(validate_http_method("POST").is_ok());
3510        assert!(validate_http_method("PUT").is_ok());
3511        assert!(validate_http_method("PATCH").is_ok());
3512        assert!(validate_http_method("").is_ok());
3513        assert!(validate_http_method("GET").is_err());
3514        assert!(validate_http_method("DELETE").is_err());
3515    }
3516
3517    #[test]
3518    fn test_postgres_cdc_config_default() {
3519        let config = PostgresCdcConfig::default();
3520        assert!(config.slot_name.is_none());
3521        assert!(config.publication.is_none());
3522        assert!(config.snapshot_mode.is_none());
3523    }
3524
3525    #[test]
3526    fn test_s3_sink_config_default() {
3527        let config = S3SinkConfig::default();
3528        assert!(config.bucket.is_none());
3529        assert!(config.region.is_none());
3530        assert!(config.format.is_none());
3531    }
3532
3533    #[test]
3534    fn test_table_spec_with_columns() {
3535        let table = TableSpec {
3536            schema: Some("public".to_string()),
3537            table: "orders".to_string(),
3538            topic: None,
3539            columns: vec!["id".to_string(), "customer_id".to_string()],
3540            exclude_columns: vec!["password".to_string()],
3541            column_masks: std::collections::BTreeMap::from([(
3542                "email".to_string(),
3543                "***@***.***".to_string(),
3544            )]),
3545        };
3546        assert_eq!(table.columns.len(), 2);
3547        assert_eq!(table.exclude_columns.len(), 1);
3548        assert_eq!(table.column_masks.len(), 1);
3549    }
3550
3551    // ========================================================================
3552    // Queue Connector Tests (rivven-queue)
3553    // ========================================================================
3554
3555    #[test]
3556    fn test_kafka_source_config_default() {
3557        let config = KafkaSourceConfig::default();
3558        assert!(config.brokers.is_none());
3559        assert!(config.topic.is_none());
3560        assert!(config.consumer_group.is_none());
3561    }
3562
3563    #[test]
3564    fn test_validate_kafka_start_offset() {
3565        assert!(validate_kafka_start_offset("earliest").is_ok());
3566        assert!(validate_kafka_start_offset("latest").is_ok());
3567        assert!(validate_kafka_start_offset("").is_ok());
3568        assert!(validate_kafka_start_offset("invalid").is_err());
3569    }
3570
3571    #[test]
3572    fn test_validate_kafka_security_protocol() {
3573        assert!(validate_kafka_security_protocol("plaintext").is_ok());
3574        assert!(validate_kafka_security_protocol("ssl").is_ok());
3575        assert!(validate_kafka_security_protocol("sasl_plaintext").is_ok());
3576        assert!(validate_kafka_security_protocol("sasl_ssl").is_ok());
3577        assert!(validate_kafka_security_protocol("").is_ok());
3578        assert!(validate_kafka_security_protocol("invalid").is_err());
3579    }
3580
3581    #[test]
3582    fn test_mqtt_source_config_default() {
3583        let config = MqttSourceConfig::default();
3584        assert!(config.broker_url.is_none());
3585        assert!(config.topics.is_none());
3586        assert!(config.client_id.is_none());
3587    }
3588
3589    #[test]
3590    fn test_validate_mqtt_qos() {
3591        assert!(validate_mqtt_qos("at_most_once").is_ok());
3592        assert!(validate_mqtt_qos("at_least_once").is_ok());
3593        assert!(validate_mqtt_qos("exactly_once").is_ok());
3594        assert!(validate_mqtt_qos("").is_ok());
3595        assert!(validate_mqtt_qos("invalid").is_err());
3596    }
3597
3598    #[test]
3599    fn test_sqs_source_config_default() {
3600        let config = SqsSourceConfig::default();
3601        assert!(config.queue_url.is_none());
3602        assert!(config.region.is_none());
3603        assert!(config.max_messages.is_none());
3604    }
3605
3606    #[test]
3607    fn test_pubsub_source_config_default() {
3608        let config = PubSubSourceConfig::default();
3609        assert!(config.project_id.is_none());
3610        assert!(config.subscription.is_none());
3611        assert!(config.topic.is_none());
3612    }
3613
3614    #[test]
3615    fn test_kafka_sink_config_default() {
3616        let config = KafkaSinkConfig::default();
3617        assert!(config.brokers.is_none());
3618        assert!(config.topic.is_none());
3619        assert!(config.acks.is_none());
3620    }
3621
3622    #[test]
3623    fn test_validate_kafka_acks() {
3624        assert!(validate_kafka_acks("none").is_ok());
3625        assert!(validate_kafka_acks("leader").is_ok());
3626        assert!(validate_kafka_acks("all").is_ok());
3627        assert!(validate_kafka_acks("0").is_ok());
3628        assert!(validate_kafka_acks("1").is_ok());
3629        assert!(validate_kafka_acks("-1").is_ok());
3630        assert!(validate_kafka_acks("").is_ok());
3631        assert!(validate_kafka_acks("invalid").is_err());
3632    }
3633
3634    // ========================================================================
3635    // Storage Connector Tests (rivven-storage)
3636    // ========================================================================
3637
3638    #[test]
3639    fn test_gcs_sink_config_default() {
3640        let config = GcsSinkConfig::default();
3641        assert!(config.bucket.is_none());
3642        assert!(config.prefix.is_none());
3643        assert!(config.format.is_none());
3644    }
3645
3646    #[test]
3647    fn test_azure_blob_sink_config_default() {
3648        let config = AzureBlobSinkConfig::default();
3649        assert!(config.account_name.is_none());
3650        assert!(config.container.is_none());
3651        assert!(config.format.is_none());
3652    }
3653
3654    // ========================================================================
3655    // Warehouse Connector Tests (rivven-warehouse)
3656    // ========================================================================
3657
3658    #[test]
3659    fn test_snowflake_sink_config_default() {
3660        let config = SnowflakeSinkConfig::default();
3661        assert!(config.account.is_none());
3662        assert!(config.user.is_none());
3663        assert!(config.database.is_none());
3664    }
3665
3666    #[test]
3667    fn test_bigquery_sink_config_default() {
3668        let config = BigQuerySinkConfig::default();
3669        assert!(config.project_id.is_none());
3670        assert!(config.dataset_id.is_none());
3671        assert!(config.table_id.is_none());
3672    }
3673
3674    #[test]
3675    fn test_redshift_sink_config_default() {
3676        let config = RedshiftSinkConfig::default();
3677        assert!(config.host.is_none());
3678        assert!(config.database.is_none());
3679        assert!(config.table.is_none());
3680    }
3681
3682    #[test]
3683    fn test_validate_redshift_ssl_mode() {
3684        assert!(validate_redshift_ssl_mode("disable").is_ok());
3685        assert!(validate_redshift_ssl_mode("prefer").is_ok());
3686        assert!(validate_redshift_ssl_mode("require").is_ok());
3687        assert!(validate_redshift_ssl_mode("verify-ca").is_ok());
3688        assert!(validate_redshift_ssl_mode("verify-full").is_ok());
3689        assert!(validate_redshift_ssl_mode("").is_ok());
3690        assert!(validate_redshift_ssl_mode("invalid").is_err());
3691    }
3692}