1use k8s_openapi::api::core::v1::ResourceRequirements;
7use kube::CustomResource;
8use once_cell::sync::Lazy;
9use regex::Regex;
10use schemars::JsonSchema;
11use serde::{Deserialize, Serialize};
12use std::collections::BTreeMap;
13use validator::{Validate, ValidationError};
14
15static QUANTITY_REGEX: Lazy<Regex> =
17 Lazy::new(|| Regex::new(r"^[0-9]+(\.[0-9]+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$").unwrap());
18
19static NAME_REGEX: Lazy<Regex> =
21 Lazy::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
22
23fn validate_quantity(value: &str) -> Result<(), ValidationError> {
25 if QUANTITY_REGEX.is_match(value) {
26 Ok(())
27 } else {
28 Err(ValidationError::new("invalid_quantity")
29 .with_message(format!("'{}' is not a valid Kubernetes quantity", value).into()))
30 }
31}
32
33fn validate_image(value: &str) -> Result<(), ValidationError> {
35 if value.is_empty() {
36 return Ok(()); }
38 if value.len() > 255 {
39 return Err(ValidationError::new("image_too_long")
40 .with_message("image reference exceeds 255 characters".into()));
41 }
42 if value.contains("..") || value.starts_with('/') || value.starts_with('-') {
44 return Err(ValidationError::new("invalid_image")
45 .with_message(format!("'{}' is not a valid container image", value).into()));
46 }
47 Ok(())
48}
49
50fn validate_k8s_name(value: &str) -> Result<(), ValidationError> {
52 if value.is_empty() {
53 return Ok(()); }
55 if value.len() > 63 {
56 return Err(
57 ValidationError::new("name_too_long").with_message("name exceeds 63 characters".into())
58 );
59 }
60 if !NAME_REGEX.is_match(value) {
61 return Err(ValidationError::new("invalid_name").with_message(
62 format!("'{}' is not a valid Kubernetes name (RFC 1123)", value).into(),
63 ));
64 }
65 Ok(())
66}
67
68fn validate_env_vars(vars: &[k8s_openapi::api::core::v1::EnvVar]) -> Result<(), ValidationError> {
70 const MAX_ENV_VARS: usize = 100;
72 if vars.len() > MAX_ENV_VARS {
73 return Err(ValidationError::new("too_many_env_vars").with_message(
74 format!("maximum {} environment variables allowed", MAX_ENV_VARS).into(),
75 ));
76 }
77 for var in vars {
78 if var.name.is_empty() || var.name.len() > 256 {
80 return Err(ValidationError::new("invalid_env_name")
81 .with_message("environment variable name must be 1-256 characters".into()));
82 }
83 let forbidden_prefixes = ["LD_", "DYLD_", "PATH=", "HOME=", "USER="];
85 for prefix in forbidden_prefixes {
86 if var.name.starts_with(prefix) && var.value.is_some() {
87 return Err(ValidationError::new("forbidden_env_var").with_message(
88 format!(
89 "environment variable '{}' is not allowed for security",
90 var.name
91 )
92 .into(),
93 ));
94 }
95 }
96 }
97 Ok(())
98}
99
100#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
106#[kube(
107 group = "rivven.io",
108 version = "v1alpha1",
109 kind = "RivvenCluster",
110 plural = "rivvenclusters",
111 shortname = "rc",
112 namespaced,
113 status = "RivvenClusterStatus",
114 printcolumn = r#"{"name":"Replicas", "type":"integer", "jsonPath":".spec.replicas"}"#,
115 printcolumn = r#"{"name":"Ready", "type":"integer", "jsonPath":".status.readyReplicas"}"#,
116 printcolumn = r#"{"name":"Phase", "type":"string", "jsonPath":".status.phase"}"#,
117 printcolumn = r#"{"name":"Age", "type":"date", "jsonPath":".metadata.creationTimestamp"}"#
118)]
119#[serde(rename_all = "camelCase")]
120pub struct RivvenClusterSpec {
121 #[serde(default = "default_replicas")]
123 #[validate(range(min = 1, max = 100, message = "replicas must be between 1 and 100"))]
124 pub replicas: i32,
125
126 #[serde(default = "default_version")]
128 #[validate(length(min = 1, max = 64, message = "version must be 1-64 characters"))]
129 pub version: String,
130
131 #[serde(default)]
134 #[validate(custom(function = "validate_optional_image"))]
135 pub image: Option<String>,
136
137 #[serde(default = "default_image_pull_policy")]
139 #[validate(custom(function = "validate_pull_policy"))]
140 pub image_pull_policy: String,
141
142 #[serde(default)]
144 #[validate(length(max = 10, message = "maximum 10 image pull secrets allowed"))]
145 pub image_pull_secrets: Vec<String>,
146
147 #[serde(default)]
149 #[validate(nested)]
150 pub storage: StorageSpec,
151
152 #[serde(default)]
154 #[schemars(skip)]
155 pub resources: Option<ResourceRequirements>,
156
157 #[serde(default)]
159 #[validate(nested)]
160 pub config: BrokerConfig,
161
162 #[serde(default)]
164 #[validate(nested)]
165 pub tls: TlsSpec,
166
167 #[serde(default)]
169 #[validate(nested)]
170 pub metrics: MetricsSpec,
171
172 #[serde(default)]
174 #[schemars(skip)]
175 pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
176
177 #[serde(default)]
179 #[validate(custom(function = "validate_node_selector"))]
180 pub node_selector: BTreeMap<String, String>,
181
182 #[serde(default)]
184 #[schemars(skip)]
185 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
186
187 #[serde(default)]
189 #[validate(nested)]
190 pub pod_disruption_budget: PdbSpec,
191
192 #[serde(default)]
194 #[validate(custom(function = "validate_optional_k8s_name"))]
195 pub service_account: Option<String>,
196
197 #[serde(default)]
199 #[validate(custom(function = "validate_annotations"))]
200 pub pod_annotations: BTreeMap<String, String>,
201
202 #[serde(default)]
204 #[validate(custom(function = "validate_labels"))]
205 pub pod_labels: BTreeMap<String, String>,
206
207 #[serde(default)]
209 #[schemars(skip)]
210 #[validate(custom(function = "validate_env_vars"))]
211 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
212
213 #[serde(default)]
215 #[validate(nested)]
216 pub liveness_probe: ProbeSpec,
217
218 #[serde(default)]
220 #[validate(nested)]
221 pub readiness_probe: ProbeSpec,
222
223 #[serde(default)]
225 #[schemars(skip)]
226 pub security_context: Option<k8s_openapi::api::core::v1::PodSecurityContext>,
227
228 #[serde(default)]
230 #[schemars(skip)]
231 pub container_security_context: Option<k8s_openapi::api::core::v1::SecurityContext>,
232}
233
234fn validate_optional_image(image: &str) -> Result<(), ValidationError> {
236 validate_image(image)
237}
238
239fn validate_pull_policy(policy: &str) -> Result<(), ValidationError> {
241 match policy {
242 "Always" | "IfNotPresent" | "Never" => Ok(()),
243 _ => Err(ValidationError::new("invalid_pull_policy")
244 .with_message("imagePullPolicy must be Always, IfNotPresent, or Never".into())),
245 }
246}
247
248fn validate_node_selector(selectors: &BTreeMap<String, String>) -> Result<(), ValidationError> {
250 if selectors.len() > 20 {
251 return Err(ValidationError::new("too_many_selectors")
252 .with_message("maximum 20 node selectors allowed".into()));
253 }
254 for (key, value) in selectors {
255 if key.len() > 253 || value.len() > 63 {
256 return Err(ValidationError::new("selector_too_long")
257 .with_message("selector key max 253 chars, value max 63 chars".into()));
258 }
259 }
260 Ok(())
261}
262
263fn validate_optional_k8s_name(name: &str) -> Result<(), ValidationError> {
265 if name.is_empty() {
266 return Ok(()); }
268 validate_k8s_name(name)
269}
270
271fn validate_annotations(annotations: &BTreeMap<String, String>) -> Result<(), ValidationError> {
273 if annotations.len() > 50 {
274 return Err(ValidationError::new("too_many_annotations")
275 .with_message("maximum 50 annotations allowed".into()));
276 }
277 for (key, value) in annotations {
278 if key.len() > 253 {
280 return Err(ValidationError::new("annotation_key_too_long")
281 .with_message(format!("annotation key '{}' exceeds 253 characters", key).into()));
282 }
283 if value.len() > 262144 {
285 return Err(ValidationError::new("annotation_value_too_long")
286 .with_message(format!("annotation '{}' value exceeds 256KB", key).into()));
287 }
288 }
289 Ok(())
290}
291
292fn validate_labels(labels: &BTreeMap<String, String>) -> Result<(), ValidationError> {
294 if labels.len() > 20 {
295 return Err(ValidationError::new("too_many_labels")
296 .with_message("maximum 20 labels allowed".into()));
297 }
298 for (key, value) in labels {
299 if key.len() > 253 || value.len() > 63 {
300 return Err(ValidationError::new("label_too_long")
301 .with_message("label key max 253 chars, value max 63 chars".into()));
302 }
303 if key.starts_with("app.kubernetes.io/") {
305 return Err(ValidationError::new("reserved_label").with_message(
306 format!("label '{}' uses reserved prefix app.kubernetes.io/", key).into(),
307 ));
308 }
309 }
310 Ok(())
311}
312
313#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
315#[serde(rename_all = "camelCase")]
316pub struct StorageSpec {
317 #[serde(default = "default_storage_size")]
319 #[validate(custom(function = "validate_quantity"))]
320 pub size: String,
321
322 #[serde(default)]
324 #[validate(custom(function = "validate_optional_k8s_name"))]
325 pub storage_class_name: Option<String>,
326
327 #[serde(default = "default_access_modes")]
329 #[validate(length(min = 1, max = 3, message = "access modes must have 1-3 entries"))]
330 #[validate(custom(function = "validate_access_modes"))]
331 pub access_modes: Vec<String>,
332}
333
334fn validate_access_modes(modes: &[String]) -> Result<(), ValidationError> {
336 let valid_modes = [
337 "ReadWriteOnce",
338 "ReadOnlyMany",
339 "ReadWriteMany",
340 "ReadWriteOncePod",
341 ];
342 for mode in modes {
343 if !valid_modes.contains(&mode.as_str()) {
344 return Err(ValidationError::new("invalid_access_mode")
345 .with_message(format!("'{}' is not a valid access mode", mode).into()));
346 }
347 }
348 Ok(())
349}
350
351impl Default for StorageSpec {
352 fn default() -> Self {
353 Self {
354 size: default_storage_size(),
355 storage_class_name: None,
356 access_modes: default_access_modes(),
357 }
358 }
359}
360
361#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
363#[serde(rename_all = "camelCase")]
364pub struct BrokerConfig {
365 #[serde(default = "default_partitions")]
367 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
368 pub default_partitions: i32,
369
370 #[serde(default = "default_replication_factor")]
372 #[validate(range(
373 min = 1,
374 max = 10,
375 message = "replication factor must be between 1 and 10"
376 ))]
377 pub default_replication_factor: i32,
378
379 #[serde(default = "default_log_retention_hours")]
381 #[validate(range(
382 min = 1,
383 max = 8760,
384 message = "retention hours must be between 1 and 8760"
385 ))]
386 pub log_retention_hours: i32,
387
388 #[serde(default = "default_log_segment_bytes")]
390 #[validate(custom(function = "validate_segment_size"))]
391 pub log_segment_bytes: i64,
392
393 #[serde(default = "default_max_message_bytes")]
395 #[validate(custom(function = "validate_message_size"))]
396 pub max_message_bytes: i64,
397
398 #[serde(default = "default_true")]
400 pub auto_create_topics: bool,
401
402 #[serde(default = "default_true")]
404 pub compression_enabled: bool,
405
406 #[serde(default = "default_compression")]
408 #[validate(custom(function = "validate_compression_type"))]
409 pub compression_type: String,
410
411 #[serde(default = "default_election_timeout")]
413 #[validate(range(
414 min = 100,
415 max = 60000,
416 message = "election timeout must be between 100ms and 60s"
417 ))]
418 pub raft_election_timeout_ms: i32,
419
420 #[serde(default = "default_heartbeat_interval")]
422 #[validate(range(
423 min = 10,
424 max = 10000,
425 message = "heartbeat interval must be between 10ms and 10s"
426 ))]
427 pub raft_heartbeat_interval_ms: i32,
428
429 #[serde(default)]
431 #[validate(custom(function = "validate_raw_config"))]
432 pub raw: BTreeMap<String, String>,
433}
434
435fn validate_compression_type(compression: &str) -> Result<(), ValidationError> {
437 match compression {
438 "lz4" | "zstd" | "none" | "snappy" | "gzip" => Ok(()),
439 _ => Err(ValidationError::new("invalid_compression")
440 .with_message("compression must be one of: lz4, zstd, none, snappy, gzip".into())),
441 }
442}
443
444fn validate_segment_size(size: i64) -> Result<(), ValidationError> {
446 const MIN_SEGMENT_SIZE: i64 = 1_048_576; const MAX_SEGMENT_SIZE: i64 = 10_737_418_240; if !(MIN_SEGMENT_SIZE..=MAX_SEGMENT_SIZE).contains(&size) {
449 return Err(ValidationError::new("invalid_segment_size")
450 .with_message("segment size must be between 1MB and 10GB".into()));
451 }
452 Ok(())
453}
454
455fn validate_message_size(size: i64) -> Result<(), ValidationError> {
457 const MIN_MESSAGE_SIZE: i64 = 1_024; const MAX_MESSAGE_SIZE: i64 = 104_857_600; if !(MIN_MESSAGE_SIZE..=MAX_MESSAGE_SIZE).contains(&size) {
460 return Err(ValidationError::new("invalid_message_size")
461 .with_message("max message size must be between 1KB and 100MB".into()));
462 }
463 Ok(())
464}
465
466fn validate_raw_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
468 if config.len() > 50 {
469 return Err(ValidationError::new("too_many_raw_configs")
470 .with_message("maximum 50 raw configuration entries allowed".into()));
471 }
472 for (key, value) in config {
473 if key.len() > 128 || value.len() > 4096 {
474 return Err(ValidationError::new("raw_config_too_long")
475 .with_message("raw config key max 128 chars, value max 4096 chars".into()));
476 }
477 let forbidden_keys = ["command", "args", "image", "securityContext", "volumes"];
479 if forbidden_keys.contains(&key.as_str()) {
480 return Err(ValidationError::new("forbidden_raw_config")
481 .with_message(format!("raw config key '{}' is not allowed", key).into()));
482 }
483 }
484 Ok(())
485}
486
487impl Default for BrokerConfig {
488 fn default() -> Self {
489 Self {
490 default_partitions: default_partitions(),
491 default_replication_factor: default_replication_factor(),
492 log_retention_hours: default_log_retention_hours(),
493 log_segment_bytes: default_log_segment_bytes(),
494 max_message_bytes: default_max_message_bytes(),
495 auto_create_topics: default_true(),
496 compression_enabled: default_true(),
497 compression_type: default_compression(),
498 raft_election_timeout_ms: default_election_timeout(),
499 raft_heartbeat_interval_ms: default_heartbeat_interval(),
500 raw: BTreeMap::new(),
501 }
502 }
503}
504
505#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
507#[serde(rename_all = "camelCase")]
508pub struct TlsSpec {
509 #[serde(default)]
511 pub enabled: bool,
512
513 #[serde(default)]
515 #[validate(custom(function = "validate_optional_k8s_name"))]
516 pub cert_secret_name: Option<String>,
517
518 #[serde(default)]
520 pub mtls_enabled: bool,
521
522 #[serde(default)]
524 #[validate(custom(function = "validate_optional_k8s_name"))]
525 pub ca_secret_name: Option<String>,
526}
527
528#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
530#[serde(rename_all = "camelCase")]
531pub struct MetricsSpec {
532 #[serde(default = "default_true")]
534 pub enabled: bool,
535
536 #[serde(default = "default_metrics_port")]
538 #[validate(range(
539 min = 1024,
540 max = 65535,
541 message = "metrics port must be between 1024 and 65535"
542 ))]
543 pub port: i32,
544
545 #[serde(default)]
547 #[validate(nested)]
548 pub service_monitor: ServiceMonitorSpec,
549}
550
551impl Default for MetricsSpec {
552 fn default() -> Self {
553 Self {
554 enabled: true,
555 port: 9090,
556 service_monitor: ServiceMonitorSpec::default(),
557 }
558 }
559}
560
561#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
563#[serde(rename_all = "camelCase")]
564pub struct ServiceMonitorSpec {
565 #[serde(default)]
567 pub enabled: bool,
568
569 #[serde(default)]
571 #[validate(custom(function = "validate_optional_k8s_name"))]
572 pub namespace: Option<String>,
573
574 #[serde(default = "default_scrape_interval")]
576 #[validate(custom(function = "validate_duration"))]
577 pub interval: String,
578
579 #[serde(default)]
581 #[validate(custom(function = "validate_service_monitor_labels"))]
582 pub labels: BTreeMap<String, String>,
583}
584
585fn validate_duration(duration: &str) -> Result<(), ValidationError> {
587 static DURATION_REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"^([0-9]+[smh])+$").unwrap());
588 if !DURATION_REGEX.is_match(duration) {
589 return Err(ValidationError::new("invalid_duration").with_message(
590 format!("'{}' is not a valid duration (e.g., 30s, 1m)", duration).into(),
591 ));
592 }
593 Ok(())
594}
595
596fn validate_service_monitor_labels(
598 labels: &BTreeMap<String, String>,
599) -> Result<(), ValidationError> {
600 if labels.len() > 10 {
601 return Err(ValidationError::new("too_many_labels")
602 .with_message("maximum 10 ServiceMonitor labels allowed".into()));
603 }
604 Ok(())
605}
606
607#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
609#[serde(rename_all = "camelCase")]
610pub struct PdbSpec {
611 #[serde(default = "default_true")]
613 pub enabled: bool,
614
615 #[serde(default)]
618 #[validate(custom(function = "validate_optional_int_or_percent"))]
619 pub min_available: Option<String>,
620
621 #[serde(default = "default_max_unavailable")]
624 #[validate(custom(function = "validate_optional_int_or_percent"))]
625 pub max_unavailable: Option<String>,
626}
627
628fn validate_optional_int_or_percent(value: &str) -> Result<(), ValidationError> {
630 if value.is_empty() {
631 return Ok(());
632 }
633 static INT_OR_PERCENT_REGEX: Lazy<Regex> =
635 Lazy::new(|| Regex::new(r"^([0-9]+|[0-9]+%)$").unwrap());
636 if !INT_OR_PERCENT_REGEX.is_match(value) {
637 return Err(ValidationError::new("invalid_int_or_percent").with_message(
638 format!(
639 "'{}' must be an integer or percentage (e.g., '1' or '25%')",
640 value
641 )
642 .into(),
643 ));
644 }
645 Ok(())
646}
647
648impl Default for PdbSpec {
649 fn default() -> Self {
650 Self {
651 enabled: true,
652 min_available: None,
653 max_unavailable: Some("1".to_string()),
654 }
655 }
656}
657
658#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
660#[serde(rename_all = "camelCase")]
661pub struct ProbeSpec {
662 #[serde(default = "default_true")]
664 pub enabled: bool,
665
666 #[serde(default = "default_initial_delay")]
668 #[validate(range(min = 0, max = 3600, message = "initial delay must be 0-3600 seconds"))]
669 pub initial_delay_seconds: i32,
670
671 #[serde(default = "default_period")]
673 #[validate(range(min = 1, max = 300, message = "period must be 1-300 seconds"))]
674 pub period_seconds: i32,
675
676 #[serde(default = "default_timeout")]
678 #[validate(range(min = 1, max = 60, message = "timeout must be 1-60 seconds"))]
679 pub timeout_seconds: i32,
680
681 #[serde(default = "default_one")]
683 #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
684 pub success_threshold: i32,
685
686 #[serde(default = "default_three")]
688 #[validate(range(min = 1, max = 30, message = "failure threshold must be 1-30"))]
689 pub failure_threshold: i32,
690}
691
692impl Default for ProbeSpec {
693 fn default() -> Self {
694 Self {
695 enabled: true,
696 initial_delay_seconds: 30,
697 period_seconds: 10,
698 timeout_seconds: 5,
699 success_threshold: 1,
700 failure_threshold: 3,
701 }
702 }
703}
704
705#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
707#[serde(rename_all = "camelCase")]
708pub struct RivvenClusterStatus {
709 pub phase: ClusterPhase,
711
712 pub replicas: i32,
714
715 pub ready_replicas: i32,
717
718 pub updated_replicas: i32,
720
721 pub observed_generation: i64,
723
724 #[serde(default)]
726 pub conditions: Vec<ClusterCondition>,
727
728 #[serde(default)]
730 pub broker_endpoints: Vec<String>,
731
732 pub leader: Option<String>,
734
735 pub last_updated: Option<String>,
737
738 pub message: Option<String>,
740}
741
742#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
744pub enum ClusterPhase {
745 #[default]
747 Pending,
748 Provisioning,
750 Running,
752 Updating,
754 Degraded,
756 Failed,
758 Terminating,
760}
761
762#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
764#[serde(rename_all = "camelCase")]
765pub struct ClusterCondition {
766 #[serde(rename = "type")]
768 pub condition_type: String,
769
770 pub status: String,
772
773 pub reason: Option<String>,
775
776 pub message: Option<String>,
778
779 pub last_transition_time: Option<String>,
781}
782
783fn default_replicas() -> i32 {
785 3
786}
787
788fn default_version() -> String {
789 "0.0.1".to_string()
790}
791
792fn default_image_pull_policy() -> String {
793 "IfNotPresent".to_string()
794}
795
796fn default_storage_size() -> String {
797 "10Gi".to_string()
798}
799
800fn default_access_modes() -> Vec<String> {
801 vec!["ReadWriteOnce".to_string()]
802}
803
804fn default_partitions() -> i32 {
805 3
806}
807
808fn default_replication_factor() -> i32 {
809 2
810}
811
812fn default_log_retention_hours() -> i32 {
813 168 }
815
816fn default_log_segment_bytes() -> i64 {
817 1073741824 }
819
820fn default_max_message_bytes() -> i64 {
821 1048576 }
823
824fn default_compression() -> String {
825 "lz4".to_string()
826}
827
828fn default_election_timeout() -> i32 {
829 1000
830}
831
832fn default_heartbeat_interval() -> i32 {
833 100
834}
835
836fn default_metrics_port() -> i32 {
837 9090
838}
839
840fn default_scrape_interval() -> String {
841 "30s".to_string()
842}
843
844fn default_max_unavailable() -> Option<String> {
845 Some("1".to_string())
846}
847
848fn default_initial_delay() -> i32 {
849 30
850}
851
852fn default_period() -> i32 {
853 10
854}
855
856fn default_timeout() -> i32 {
857 5
858}
859
860fn default_one() -> i32 {
861 1
862}
863
864fn default_three() -> i32 {
865 3
866}
867
868fn default_true() -> bool {
869 true
870}
871
872impl RivvenClusterSpec {
873 pub fn get_image(&self) -> String {
875 if let Some(ref image) = self.image {
876 image.clone()
877 } else {
878 format!("ghcr.io/hupe1980/rivven:{}", self.version)
879 }
880 }
881
882 pub fn get_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
884 let mut labels = BTreeMap::new();
885 labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
886 labels.insert(
887 "app.kubernetes.io/instance".to_string(),
888 cluster_name.to_string(),
889 );
890 labels.insert(
891 "app.kubernetes.io/component".to_string(),
892 "broker".to_string(),
893 );
894 labels.insert(
895 "app.kubernetes.io/managed-by".to_string(),
896 "rivven-operator".to_string(),
897 );
898 labels.insert(
899 "app.kubernetes.io/version".to_string(),
900 self.version.clone(),
901 );
902 labels
903 }
904
905 pub fn get_selector_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
907 let mut labels = BTreeMap::new();
908 labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
909 labels.insert(
910 "app.kubernetes.io/instance".to_string(),
911 cluster_name.to_string(),
912 );
913 labels
914 }
915}
916
917#[allow(dead_code)]
930#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
931#[kube(
932 group = "rivven.io",
933 version = "v1alpha1",
934 kind = "RivvenConnect",
935 plural = "rivvenconnects",
936 shortname = "rc",
937 namespaced,
938 status = "RivvenConnectStatus",
939 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
940 printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
941 printcolumn = r#"{"name":"Sources","type":"integer","jsonPath":".status.sourcesRunning"}"#,
942 printcolumn = r#"{"name":"Sinks","type":"integer","jsonPath":".status.sinksRunning"}"#,
943 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
944 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
945)]
946#[serde(rename_all = "camelCase")]
947pub struct RivvenConnectSpec {
948 #[validate(nested)]
950 pub cluster_ref: ClusterReference,
951
952 #[serde(default = "default_connect_replicas")]
954 #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
955 pub replicas: i32,
956
957 #[serde(default = "default_version")]
959 pub version: String,
960
961 #[serde(default)]
963 #[validate(custom(function = "validate_optional_image"))]
964 pub image: Option<String>,
965
966 #[serde(default = "default_image_pull_policy")]
968 #[validate(custom(function = "validate_pull_policy"))]
969 pub image_pull_policy: String,
970
971 #[serde(default)]
973 pub image_pull_secrets: Vec<String>,
974
975 #[serde(default)]
977 pub resources: Option<serde_json::Value>,
978
979 #[serde(default)]
981 #[validate(nested)]
982 pub config: ConnectConfigSpec,
983
984 #[serde(default)]
986 #[validate(length(max = 50, message = "maximum 50 source connectors allowed"))]
987 pub sources: Vec<SourceConnectorSpec>,
988
989 #[serde(default)]
991 #[validate(length(max = 50, message = "maximum 50 sink connectors allowed"))]
992 pub sinks: Vec<SinkConnectorSpec>,
993
994 #[serde(default)]
996 #[validate(nested)]
997 pub settings: GlobalConnectSettings,
998
999 #[serde(default)]
1001 #[validate(nested)]
1002 pub tls: ConnectTlsSpec,
1003
1004 #[serde(default)]
1006 #[validate(custom(function = "validate_annotations"))]
1007 pub pod_annotations: BTreeMap<String, String>,
1008
1009 #[serde(default)]
1011 #[validate(custom(function = "validate_labels"))]
1012 pub pod_labels: BTreeMap<String, String>,
1013
1014 #[serde(default)]
1016 #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
1017 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
1018
1019 #[serde(default)]
1021 pub node_selector: BTreeMap<String, String>,
1022
1023 #[serde(default)]
1025 #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
1026 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
1027
1028 #[serde(default)]
1030 pub affinity: Option<serde_json::Value>,
1031
1032 #[serde(default)]
1034 #[validate(custom(function = "validate_optional_k8s_name"))]
1035 pub service_account: Option<String>,
1036
1037 #[serde(default)]
1039 pub security_context: Option<serde_json::Value>,
1040
1041 #[serde(default)]
1043 pub container_security_context: Option<serde_json::Value>,
1044}
1045
1046#[allow(dead_code)]
1048#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1049#[serde(rename_all = "camelCase")]
1050pub struct ClusterReference {
1051 #[validate(length(min = 1, max = 63, message = "cluster name must be 1-63 characters"))]
1053 #[validate(custom(function = "validate_k8s_name"))]
1054 pub name: String,
1055
1056 #[serde(default)]
1058 #[validate(custom(function = "validate_optional_k8s_name"))]
1059 pub namespace: Option<String>,
1060}
1061
1062#[allow(dead_code)]
1097#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1098#[kube(
1099 group = "rivven.io",
1100 version = "v1alpha1",
1101 kind = "RivvenTopic",
1102 plural = "rivventopics",
1103 shortname = "rt",
1104 namespaced,
1105 status = "RivvenTopicStatus",
1106 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1107 printcolumn = r#"{"name":"Partitions","type":"integer","jsonPath":".spec.partitions"}"#,
1108 printcolumn = r#"{"name":"Replication","type":"integer","jsonPath":".spec.replicationFactor"}"#,
1109 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1110 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1111)]
1112#[serde(rename_all = "camelCase")]
1113pub struct RivvenTopicSpec {
1114 #[validate(nested)]
1116 pub cluster_ref: ClusterReference,
1117
1118 #[serde(default = "default_rivven_topic_partitions")]
1121 #[validate(range(
1122 min = 1,
1123 max = 10000,
1124 message = "partitions must be between 1 and 10000"
1125 ))]
1126 pub partitions: i32,
1127
1128 #[serde(default = "default_rivven_topic_replication")]
1131 #[validate(range(
1132 min = 1,
1133 max = 10,
1134 message = "replication factor must be between 1 and 10"
1135 ))]
1136 pub replication_factor: i32,
1137
1138 #[serde(default)]
1140 #[validate(nested)]
1141 pub config: TopicConfig,
1142
1143 #[serde(default)]
1145 #[validate(length(max = 100, message = "maximum 100 ACL entries allowed"))]
1146 #[validate(custom(function = "validate_topic_acls"))]
1147 pub acls: Vec<TopicAcl>,
1148
1149 #[serde(default = "default_true")]
1152 pub delete_on_remove: bool,
1153
1154 #[serde(default)]
1156 #[validate(custom(function = "validate_labels"))]
1157 pub topic_labels: BTreeMap<String, String>,
1158}
1159
1160#[allow(dead_code)]
1161fn default_rivven_topic_partitions() -> i32 {
1162 3
1163}
1164
1165#[allow(dead_code)]
1166fn default_rivven_topic_replication() -> i32 {
1167 1
1168}
1169
1170#[allow(dead_code)]
1172#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1173#[serde(rename_all = "camelCase")]
1174pub struct TopicConfig {
1175 #[serde(default = "default_topic_retention_ms")]
1178 #[validate(range(
1179 min = 3600000,
1180 max = 315360000000_i64,
1181 message = "retention must be between 1 hour and 10 years"
1182 ))]
1183 pub retention_ms: i64,
1184
1185 #[serde(default = "default_topic_retention_bytes")]
1188 #[validate(custom(function = "validate_topic_retention_bytes"))]
1189 pub retention_bytes: i64,
1190
1191 #[serde(default = "default_topic_segment_bytes")]
1193 #[validate(custom(function = "validate_segment_size"))]
1194 pub segment_bytes: i64,
1195
1196 #[serde(default = "default_topic_cleanup_policy")]
1198 #[validate(custom(function = "validate_topic_cleanup_policy"))]
1199 pub cleanup_policy: String,
1200
1201 #[serde(default = "default_topic_compression")]
1203 #[validate(custom(function = "validate_topic_compression"))]
1204 pub compression_type: String,
1205
1206 #[serde(default = "default_topic_min_isr")]
1208 #[validate(range(min = 1, max = 10, message = "min ISR must be between 1 and 10"))]
1209 pub min_insync_replicas: i32,
1210
1211 #[serde(default = "default_max_message_bytes")]
1213 #[validate(custom(function = "validate_message_size"))]
1214 pub max_message_bytes: i64,
1215
1216 #[serde(default = "default_true")]
1218 pub message_timestamp_enabled: bool,
1219
1220 #[serde(default = "default_topic_timestamp_type")]
1222 #[validate(custom(function = "validate_topic_timestamp_type"))]
1223 pub message_timestamp_type: String,
1224
1225 #[serde(default = "default_true")]
1227 pub idempotent_writes: bool,
1228
1229 #[serde(default)]
1231 #[validate(range(min = 0, max = 86400000, message = "flush interval must be 0-24 hours"))]
1232 pub flush_interval_ms: i64,
1233
1234 #[serde(default)]
1236 #[validate(custom(function = "validate_topic_custom_config"))]
1237 pub custom: BTreeMap<String, String>,
1238}
1239
1240#[allow(dead_code)]
1241fn default_topic_retention_ms() -> i64 {
1242 604800000 }
1244
1245#[allow(dead_code)]
1246fn default_topic_retention_bytes() -> i64 {
1247 -1 }
1249
1250#[allow(dead_code)]
1251fn default_topic_segment_bytes() -> i64 {
1252 1073741824 }
1254
1255#[allow(dead_code)]
1256fn default_topic_cleanup_policy() -> String {
1257 "delete".to_string()
1258}
1259
1260#[allow(dead_code)]
1261fn default_topic_compression() -> String {
1262 "lz4".to_string()
1263}
1264
1265#[allow(dead_code)]
1266fn default_topic_min_isr() -> i32 {
1267 1
1268}
1269
1270#[allow(dead_code)]
1271fn default_topic_timestamp_type() -> String {
1272 "CreateTime".to_string()
1273}
1274
1275#[allow(dead_code)]
1276fn validate_topic_retention_bytes(value: i64) -> Result<(), ValidationError> {
1277 if value == -1 || (1048576..=10995116277760).contains(&value) {
1278 Ok(())
1279 } else {
1280 Err(ValidationError::new("invalid_retention_bytes")
1281 .with_message("retention_bytes must be -1 (unlimited) or 1MB-10TB".into()))
1282 }
1283}
1284
1285#[allow(dead_code)]
1286fn validate_topic_cleanup_policy(policy: &str) -> Result<(), ValidationError> {
1287 match policy {
1288 "delete" | "compact" | "delete,compact" | "compact,delete" => Ok(()),
1289 _ => Err(ValidationError::new("invalid_cleanup_policy").with_message(
1290 "cleanup_policy must be 'delete', 'compact', or 'delete,compact'".into(),
1291 )),
1292 }
1293}
1294
1295#[allow(dead_code)]
1296fn validate_topic_compression(compression: &str) -> Result<(), ValidationError> {
1297 match compression {
1298 "none" | "gzip" | "snappy" | "lz4" | "zstd" | "producer" => Ok(()),
1299 _ => Err(ValidationError::new("invalid_compression")
1300 .with_message("compression must be none, gzip, snappy, lz4, zstd, or producer".into())),
1301 }
1302}
1303
1304#[allow(dead_code)]
1305fn validate_topic_timestamp_type(ts_type: &str) -> Result<(), ValidationError> {
1306 match ts_type {
1307 "CreateTime" | "LogAppendTime" => Ok(()),
1308 _ => Err(ValidationError::new("invalid_timestamp_type")
1309 .with_message("timestamp type must be 'CreateTime' or 'LogAppendTime'".into())),
1310 }
1311}
1312
1313#[allow(dead_code)]
1314fn validate_topic_custom_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
1315 if config.len() > 50 {
1316 return Err(ValidationError::new("too_many_custom_configs")
1317 .with_message("maximum 50 custom config entries allowed".into()));
1318 }
1319 for (key, value) in config {
1320 if key.len() > 128 || value.len() > 4096 {
1321 return Err(ValidationError::new("config_too_long")
1322 .with_message("config key max 128 chars, value max 4096 chars".into()));
1323 }
1324 let protected = [
1326 "retention.ms",
1327 "retention.bytes",
1328 "segment.bytes",
1329 "cleanup.policy",
1330 ];
1331 if protected.contains(&key.as_str()) {
1332 return Err(ValidationError::new("protected_config").with_message(
1333 format!(
1334 "'{}' must be set via dedicated field, not custom config",
1335 key
1336 )
1337 .into(),
1338 ));
1339 }
1340 }
1341 Ok(())
1342}
1343
1344#[allow(dead_code)]
1346#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1347#[serde(rename_all = "camelCase")]
1348pub struct TopicAcl {
1349 #[validate(length(min = 1, max = 256, message = "principal must be 1-256 characters"))]
1351 #[validate(custom(function = "validate_principal"))]
1352 pub principal: String,
1353
1354 #[validate(length(min = 1, max = 7, message = "must specify 1-7 operations"))]
1356 #[validate(custom(function = "validate_operations"))]
1357 pub operations: Vec<String>,
1358
1359 #[serde(default = "default_permission_type")]
1361 #[validate(custom(function = "validate_permission_type"))]
1362 pub permission_type: String,
1363
1364 #[serde(default = "default_acl_host")]
1366 #[validate(length(max = 256, message = "host must be max 256 characters"))]
1367 pub host: String,
1368}
1369
1370#[allow(dead_code)]
1371fn default_permission_type() -> String {
1372 "Allow".to_string()
1373}
1374
1375#[allow(dead_code)]
1376fn default_acl_host() -> String {
1377 "*".to_string()
1378}
1379
1380#[allow(dead_code)]
1381fn validate_principal(principal: &str) -> Result<(), ValidationError> {
1382 if principal == "*" {
1383 return Ok(());
1384 }
1385 if let Some((prefix, name)) = principal.split_once(':') {
1386 if !["user", "group", "User", "Group"].contains(&prefix) {
1387 return Err(ValidationError::new("invalid_principal_prefix")
1388 .with_message("principal prefix must be 'user:' or 'group:'".into()));
1389 }
1390 if name.is_empty() || name.len() > 128 {
1391 return Err(ValidationError::new("invalid_principal_name")
1392 .with_message("principal name must be 1-128 characters".into()));
1393 }
1394 Ok(())
1395 } else {
1396 Err(ValidationError::new("invalid_principal_format")
1397 .with_message("principal must be '*' or 'user:name' or 'group:name'".into()))
1398 }
1399}
1400
1401fn validate_operations(ops: &[String]) -> Result<(), ValidationError> {
1402 let valid_ops = [
1403 "Read",
1404 "Write",
1405 "Create",
1406 "Delete",
1407 "Alter",
1408 "Describe",
1409 "All",
1410 "DescribeConfigs",
1411 "AlterConfigs",
1412 ];
1413 for op in ops {
1414 if !valid_ops.contains(&op.as_str()) {
1415 return Err(ValidationError::new("invalid_operation").with_message(
1416 format!("'{}' is not a valid operation. Valid: {:?}", op, valid_ops).into(),
1417 ));
1418 }
1419 }
1420 Ok(())
1421}
1422
1423#[allow(dead_code)]
1424fn validate_permission_type(perm: &str) -> Result<(), ValidationError> {
1425 match perm {
1426 "Allow" | "Deny" => Ok(()),
1427 _ => Err(ValidationError::new("invalid_permission_type")
1428 .with_message("permission_type must be 'Allow' or 'Deny'".into())),
1429 }
1430}
1431
1432#[allow(dead_code)]
1433fn validate_topic_acls(acls: &[TopicAcl]) -> Result<(), ValidationError> {
1434 let mut seen = std::collections::HashSet::new();
1436 for acl in acls {
1437 for op in &acl.operations {
1438 let key = format!("{}:{}", acl.principal, op);
1439 if !seen.insert(key.clone()) {
1440 return Err(ValidationError::new("duplicate_acl")
1441 .with_message(format!("duplicate ACL entry for {}", key).into()));
1442 }
1443 }
1444 }
1445 Ok(())
1446}
1447
1448#[allow(dead_code)]
1450#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1451#[serde(rename_all = "camelCase")]
1452pub struct RivvenTopicStatus {
1453 #[serde(default)]
1455 pub phase: String,
1456
1457 #[serde(default)]
1459 pub message: String,
1460
1461 #[serde(default)]
1463 pub current_partitions: i32,
1464
1465 #[serde(default)]
1467 pub current_replication_factor: i32,
1468
1469 #[serde(default)]
1471 pub topic_exists: bool,
1472
1473 #[serde(default)]
1475 pub observed_generation: i64,
1476
1477 #[serde(default)]
1479 pub conditions: Vec<TopicCondition>,
1480
1481 #[serde(default)]
1483 pub last_sync_time: Option<String>,
1484
1485 #[serde(default)]
1487 pub partition_info: Vec<PartitionInfo>,
1488}
1489
1490#[allow(dead_code)]
1492#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1493#[serde(rename_all = "camelCase")]
1494pub struct TopicCondition {
1495 pub r#type: String,
1497
1498 pub status: String,
1500
1501 pub reason: String,
1503
1504 pub message: String,
1506
1507 pub last_transition_time: String,
1509}
1510
1511#[allow(dead_code)]
1513#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1514#[serde(rename_all = "camelCase")]
1515pub struct PartitionInfo {
1516 pub partition: i32,
1518
1519 pub leader: i32,
1521
1522 pub replicas: Vec<i32>,
1524
1525 pub isr: Vec<i32>,
1527}
1528
1529#[allow(dead_code)]
1531#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1532#[serde(rename_all = "camelCase")]
1533pub struct ConnectConfigSpec {
1534 #[serde(default = "default_state_dir")]
1536 pub state_dir: String,
1537
1538 #[serde(default = "default_log_level")]
1540 #[validate(custom(function = "validate_log_level"))]
1541 pub log_level: String,
1542}
1543
1544#[allow(dead_code)]
1545fn default_state_dir() -> String {
1546 "/data/connect-state".to_string()
1547}
1548
1549#[allow(dead_code)]
1550fn default_log_level() -> String {
1551 "info".to_string()
1552}
1553
1554#[allow(dead_code)]
1555fn validate_log_level(level: &str) -> Result<(), ValidationError> {
1556 match level {
1557 "trace" | "debug" | "info" | "warn" | "error" => Ok(()),
1558 _ => Err(ValidationError::new("invalid_log_level")
1559 .with_message("log level must be one of: trace, debug, info, warn, error".into())),
1560 }
1561}
1562
1563#[allow(dead_code)]
1565#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1566#[serde(rename_all = "camelCase")]
1567pub struct SourceConnectorSpec {
1568 #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1570 #[validate(custom(function = "validate_k8s_name"))]
1571 pub name: String,
1572
1573 #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1575 #[validate(custom(function = "validate_connector_type"))]
1576 pub connector: String,
1577
1578 #[validate(length(min = 1, max = 255, message = "topic must be 1-255 characters"))]
1580 pub topic: String,
1581
1582 #[serde(default)]
1584 pub topic_routing: Option<String>,
1585
1586 #[serde(default = "default_true")]
1588 pub enabled: bool,
1589
1590 #[serde(default)]
1596 #[validate(nested)]
1597 pub postgres_cdc: Option<PostgresCdcConfig>,
1598
1599 #[serde(default)]
1601 #[validate(nested)]
1602 pub mysql_cdc: Option<MysqlCdcConfig>,
1603
1604 #[serde(default)]
1606 #[validate(nested)]
1607 pub http: Option<HttpSourceConfig>,
1608
1609 #[serde(default)]
1611 #[validate(nested)]
1612 pub datagen: Option<DatagenConfig>,
1613
1614 #[serde(default)]
1616 #[validate(nested)]
1617 pub kafka: Option<KafkaSourceConfig>,
1618
1619 #[serde(default)]
1621 #[validate(nested)]
1622 pub mqtt: Option<MqttSourceConfig>,
1623
1624 #[serde(default)]
1626 #[validate(nested)]
1627 pub sqs: Option<SqsSourceConfig>,
1628
1629 #[serde(default)]
1631 #[validate(nested)]
1632 pub pubsub: Option<PubSubSourceConfig>,
1633
1634 #[serde(default)]
1640 pub config: serde_json::Value,
1641
1642 #[serde(default)]
1644 #[validate(custom(function = "validate_optional_k8s_name"))]
1645 pub config_secret_ref: Option<String>,
1646
1647 #[serde(default)]
1649 #[validate(nested)]
1650 pub topic_config: SourceTopicConfigSpec,
1651}
1652
1653#[allow(dead_code)]
1654fn validate_connector_type(connector: &str) -> Result<(), ValidationError> {
1655 static CONNECTOR_REGEX: Lazy<Regex> =
1657 Lazy::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
1658 if !CONNECTOR_REGEX.is_match(connector) {
1659 return Err(ValidationError::new("invalid_connector_type")
1660 .with_message("connector type must be lowercase alphanumeric with hyphens".into()));
1661 }
1662 Ok(())
1663}
1664
1665#[allow(dead_code)]
1667#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1668#[serde(rename_all = "camelCase")]
1669pub struct TableSpec {
1670 #[serde(default)]
1672 pub schema: Option<String>,
1673
1674 #[validate(length(min = 1, max = 128, message = "table name must be 1-128 characters"))]
1676 pub table: String,
1677
1678 #[serde(default)]
1680 pub topic: Option<String>,
1681
1682 #[serde(default)]
1684 #[validate(length(max = 500, message = "maximum 500 columns per table"))]
1685 pub columns: Vec<String>,
1686
1687 #[serde(default)]
1689 #[validate(length(max = 500, message = "maximum 500 excluded columns per table"))]
1690 pub exclude_columns: Vec<String>,
1691
1692 #[serde(default)]
1694 pub column_masks: std::collections::BTreeMap<String, String>,
1695}
1696
1697#[allow(dead_code)]
1705#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1706#[serde(rename_all = "camelCase")]
1707pub struct PostgresCdcConfig {
1708 #[serde(default)]
1710 #[validate(length(max = 63, message = "slot name max 63 characters"))]
1711 pub slot_name: Option<String>,
1712
1713 #[serde(default)]
1715 #[validate(length(max = 63, message = "publication name max 63 characters"))]
1716 pub publication: Option<String>,
1717
1718 #[serde(default)]
1720 #[validate(custom(function = "validate_snapshot_mode"))]
1721 pub snapshot_mode: Option<String>,
1722
1723 #[serde(default)]
1725 #[validate(custom(function = "validate_decoding_plugin"))]
1726 pub decoding_plugin: Option<String>,
1727
1728 #[serde(default)]
1730 pub include_transaction_metadata: Option<bool>,
1731
1732 #[serde(default)]
1734 #[validate(range(
1735 min = 0,
1736 max = 3600000,
1737 message = "heartbeat interval must be 0-3600000ms"
1738 ))]
1739 pub heartbeat_interval_ms: Option<i64>,
1740
1741 #[serde(default)]
1743 pub signal_table: Option<String>,
1744
1745 #[serde(default)]
1747 #[validate(length(max = 100, message = "maximum 100 tables per source"))]
1748 pub tables: Vec<TableSpec>,
1749}
1750
1751#[allow(dead_code)]
1752fn validate_snapshot_mode(mode: &str) -> Result<(), ValidationError> {
1753 match mode {
1754 "" | "initial" | "never" | "when_needed" | "exported" | "custom" => Ok(()),
1755 _ => Err(ValidationError::new("invalid_snapshot_mode").with_message(
1756 "snapshot mode must be: initial, never, when_needed, exported, or custom".into(),
1757 )),
1758 }
1759}
1760
1761#[allow(dead_code)]
1762fn validate_decoding_plugin(plugin: &str) -> Result<(), ValidationError> {
1763 match plugin {
1764 "" | "pgoutput" | "wal2json" | "decoderbufs" => Ok(()),
1765 _ => Err(ValidationError::new("invalid_decoding_plugin")
1766 .with_message("decoding plugin must be: pgoutput, wal2json, or decoderbufs".into())),
1767 }
1768}
1769
1770#[allow(dead_code)]
1772#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1773#[serde(rename_all = "camelCase")]
1774pub struct MysqlCdcConfig {
1775 #[serde(default)]
1777 #[validate(range(
1778 min = 1,
1779 max = 4294967295_i64,
1780 message = "server ID must be 1-4294967295"
1781 ))]
1782 pub server_id: Option<i64>,
1783
1784 #[serde(default)]
1786 #[validate(custom(function = "validate_mysql_snapshot_mode"))]
1787 pub snapshot_mode: Option<String>,
1788
1789 #[serde(default)]
1791 pub include_gtid: Option<bool>,
1792
1793 #[serde(default)]
1795 #[validate(range(
1796 min = 0,
1797 max = 3600000,
1798 message = "heartbeat interval must be 0-3600000ms"
1799 ))]
1800 pub heartbeat_interval_ms: Option<i64>,
1801
1802 #[serde(default)]
1804 pub database_history_topic: Option<String>,
1805
1806 #[serde(default)]
1808 #[validate(length(max = 100, message = "maximum 100 tables per source"))]
1809 pub tables: Vec<TableSpec>,
1810}
1811
1812#[allow(dead_code)]
1813fn validate_mysql_snapshot_mode(mode: &str) -> Result<(), ValidationError> {
1814 match mode {
1815 "" | "initial" | "never" | "when_needed" | "schema_only" => Ok(()),
1816 _ => Err(
1817 ValidationError::new("invalid_mysql_snapshot_mode").with_message(
1818 "snapshot mode must be: initial, never, when_needed, or schema_only".into(),
1819 ),
1820 ),
1821 }
1822}
1823
1824#[allow(dead_code)]
1826#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1827#[serde(rename_all = "camelCase")]
1828pub struct HttpSourceConfig {
1829 #[serde(default)]
1831 pub listen_address: Option<String>,
1832
1833 #[serde(default)]
1835 #[validate(range(min = 1, max = 65535, message = "port must be 1-65535"))]
1836 pub port: Option<i32>,
1837
1838 #[serde(default)]
1840 pub path_prefix: Option<String>,
1841
1842 #[serde(default)]
1844 pub require_auth: Option<bool>,
1845
1846 #[serde(default)]
1848 pub auth_secret_ref: Option<String>,
1849}
1850
1851#[allow(dead_code)]
1853#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1854#[serde(rename_all = "camelCase")]
1855pub struct DatagenConfig {
1856 #[serde(default)]
1858 #[validate(range(
1859 min = 1,
1860 max = 1000000,
1861 message = "events per second must be 1-1000000"
1862 ))]
1863 pub events_per_second: Option<i64>,
1864
1865 #[serde(default)]
1867 pub max_events: Option<i64>,
1868
1869 #[serde(default)]
1871 pub schema_type: Option<String>,
1872
1873 #[serde(default)]
1875 pub seed: Option<i64>,
1876}
1877
1878#[allow(dead_code)]
1884#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1885#[serde(rename_all = "camelCase")]
1886pub struct KafkaSourceConfig {
1887 #[serde(default)]
1889 pub brokers: Option<Vec<String>>,
1890
1891 #[serde(default)]
1893 pub topic: Option<String>,
1894
1895 #[serde(default)]
1897 pub consumer_group: Option<String>,
1898
1899 #[serde(default)]
1901 #[validate(custom(function = "validate_kafka_start_offset"))]
1902 pub start_offset: Option<String>,
1903
1904 #[serde(default)]
1906 #[validate(custom(function = "validate_kafka_security_protocol"))]
1907 pub security_protocol: Option<String>,
1908
1909 #[serde(default)]
1911 pub sasl_mechanism: Option<String>,
1912
1913 #[serde(default)]
1915 pub sasl_username: Option<String>,
1916}
1917
1918#[allow(dead_code)]
1919fn validate_kafka_start_offset(offset: &str) -> Result<(), ValidationError> {
1920 match offset {
1921 "" | "earliest" | "latest" => Ok(()),
1922 _ => Err(ValidationError::new("invalid_kafka_start_offset")
1923 .with_message("start offset must be: earliest or latest".into())),
1924 }
1925}
1926
1927#[allow(dead_code)]
1928fn validate_kafka_security_protocol(protocol: &str) -> Result<(), ValidationError> {
1929 match protocol {
1930 "" | "plaintext" | "ssl" | "sasl_plaintext" | "sasl_ssl" => Ok(()),
1931 _ => Err(ValidationError::new("invalid_kafka_security_protocol")
1932 .with_message("protocol must be: plaintext, ssl, sasl_plaintext, or sasl_ssl".into())),
1933 }
1934}
1935
1936#[allow(dead_code)]
1938#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1939#[serde(rename_all = "camelCase")]
1940pub struct MqttSourceConfig {
1941 #[serde(default)]
1943 pub broker_url: Option<String>,
1944
1945 #[serde(default)]
1947 pub topics: Option<Vec<String>>,
1948
1949 #[serde(default)]
1951 pub client_id: Option<String>,
1952
1953 #[serde(default)]
1955 #[validate(custom(function = "validate_mqtt_qos"))]
1956 pub qos: Option<String>,
1957
1958 #[serde(default)]
1960 pub clean_session: Option<bool>,
1961
1962 #[serde(default)]
1964 pub mqtt_version: Option<String>,
1965
1966 #[serde(default)]
1968 pub username: Option<String>,
1969}
1970
1971#[allow(dead_code)]
1972fn validate_mqtt_qos(qos: &str) -> Result<(), ValidationError> {
1973 match qos {
1974 "" | "at_most_once" | "at_least_once" | "exactly_once" => Ok(()),
1975 _ => Err(ValidationError::new("invalid_mqtt_qos")
1976 .with_message("QoS must be: at_most_once, at_least_once, or exactly_once".into())),
1977 }
1978}
1979
1980#[allow(dead_code)]
1982#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1983#[serde(rename_all = "camelCase")]
1984pub struct SqsSourceConfig {
1985 #[serde(default)]
1987 pub queue_url: Option<String>,
1988
1989 #[serde(default)]
1991 pub region: Option<String>,
1992
1993 #[serde(default)]
1995 #[validate(range(min = 1, max = 10, message = "max messages must be 1-10"))]
1996 pub max_messages: Option<i32>,
1997
1998 #[serde(default)]
2000 #[validate(range(min = 0, max = 20, message = "wait time must be 0-20 seconds"))]
2001 pub wait_time_seconds: Option<i32>,
2002
2003 #[serde(default)]
2005 #[validate(range(
2006 min = 0,
2007 max = 43200,
2008 message = "visibility timeout must be 0-43200 seconds"
2009 ))]
2010 pub visibility_timeout: Option<i32>,
2011
2012 #[serde(default)]
2014 pub aws_profile: Option<String>,
2015
2016 #[serde(default)]
2018 pub role_arn: Option<String>,
2019}
2020
2021#[allow(dead_code)]
2023#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2024#[serde(rename_all = "camelCase")]
2025pub struct PubSubSourceConfig {
2026 #[serde(default)]
2028 pub project_id: Option<String>,
2029
2030 #[serde(default)]
2032 pub subscription: Option<String>,
2033
2034 #[serde(default)]
2036 pub topic: Option<String>,
2037
2038 #[serde(default)]
2040 #[validate(range(min = 1, max = 1000, message = "max messages must be 1-1000"))]
2041 pub max_messages: Option<i32>,
2042
2043 #[serde(default)]
2045 #[validate(range(min = 10, max = 600, message = "ack deadline must be 10-600 seconds"))]
2046 pub ack_deadline_seconds: Option<i32>,
2047
2048 #[serde(default)]
2050 pub credentials_file: Option<String>,
2051
2052 #[serde(default)]
2054 pub use_adc: Option<bool>,
2055}
2056
2057#[allow(dead_code)]
2059#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2060#[serde(rename_all = "camelCase")]
2061pub struct S3SinkConfig {
2062 #[serde(default)]
2064 #[validate(length(max = 63, message = "bucket name max 63 characters"))]
2065 pub bucket: Option<String>,
2066
2067 #[serde(default)]
2069 pub region: Option<String>,
2070
2071 #[serde(default)]
2073 pub endpoint_url: Option<String>,
2074
2075 #[serde(default)]
2077 pub prefix: Option<String>,
2078
2079 #[serde(default)]
2081 #[validate(custom(function = "validate_output_format"))]
2082 pub format: Option<String>,
2083
2084 #[serde(default)]
2086 #[validate(custom(function = "validate_s3_compression"))]
2087 pub compression: Option<String>,
2088
2089 #[serde(default)]
2091 #[validate(range(min = 1, max = 1000000, message = "batch size must be 1-1000000"))]
2092 pub batch_size: Option<i64>,
2093
2094 #[serde(default)]
2096 #[validate(range(
2097 min = 1,
2098 max = 86400,
2099 message = "flush interval must be 1-86400 seconds"
2100 ))]
2101 pub flush_interval_seconds: Option<i64>,
2102}
2103
2104#[allow(dead_code)]
2105fn validate_output_format(format: &str) -> Result<(), ValidationError> {
2106 match format {
2107 "" | "json" | "jsonl" | "parquet" | "avro" => Ok(()),
2108 _ => Err(ValidationError::new("invalid_output_format")
2109 .with_message("format must be: json, jsonl, parquet, or avro".into())),
2110 }
2111}
2112
2113#[allow(dead_code)]
2114fn validate_s3_compression(compression: &str) -> Result<(), ValidationError> {
2115 match compression {
2116 "" | "none" | "gzip" | "snappy" | "lz4" | "zstd" => Ok(()),
2117 _ => Err(ValidationError::new("invalid_compression")
2118 .with_message("compression must be: none, gzip, snappy, lz4, or zstd".into())),
2119 }
2120}
2121
2122#[allow(dead_code)]
2124#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2125#[serde(rename_all = "camelCase")]
2126pub struct HttpSinkConfig {
2127 #[serde(default)]
2129 pub url: Option<String>,
2130
2131 #[serde(default)]
2133 #[validate(custom(function = "validate_http_method"))]
2134 pub method: Option<String>,
2135
2136 #[serde(default)]
2138 pub content_type: Option<String>,
2139
2140 #[serde(default)]
2142 #[validate(range(min = 100, max = 300000, message = "timeout must be 100-300000ms"))]
2143 pub timeout_ms: Option<i64>,
2144
2145 #[serde(default)]
2147 #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2148 pub batch_size: Option<i64>,
2149}
2150
2151#[allow(dead_code)]
2152fn validate_http_method(method: &str) -> Result<(), ValidationError> {
2153 match method {
2154 "" | "POST" | "PUT" | "PATCH" => Ok(()),
2155 _ => Err(ValidationError::new("invalid_http_method")
2156 .with_message("HTTP method must be: POST, PUT, or PATCH".into())),
2157 }
2158}
2159
2160#[allow(dead_code)]
2162#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2163#[serde(rename_all = "camelCase")]
2164pub struct StdoutSinkConfig {
2165 #[serde(default)]
2167 pub format: Option<String>,
2168
2169 #[serde(default)]
2171 pub pretty: Option<bool>,
2172
2173 #[serde(default)]
2175 pub include_metadata: Option<bool>,
2176}
2177
2178#[allow(dead_code)]
2184#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2185#[serde(rename_all = "camelCase")]
2186pub struct KafkaSinkConfig {
2187 #[serde(default)]
2189 pub brokers: Option<Vec<String>>,
2190
2191 #[serde(default)]
2193 pub topic: Option<String>,
2194
2195 #[serde(default)]
2197 #[validate(custom(function = "validate_kafka_acks"))]
2198 pub acks: Option<String>,
2199
2200 #[serde(default)]
2202 pub compression: Option<String>,
2203
2204 #[serde(default)]
2206 #[validate(range(min = 0, max = 1048576, message = "batch size must be 0-1048576 bytes"))]
2207 pub batch_size: Option<i64>,
2208
2209 #[serde(default)]
2211 pub linger_ms: Option<i64>,
2212
2213 #[serde(default)]
2215 pub security_protocol: Option<String>,
2216
2217 #[serde(default)]
2219 pub sasl_mechanism: Option<String>,
2220
2221 #[serde(default)]
2223 pub sasl_username: Option<String>,
2224}
2225
2226#[allow(dead_code)]
2227fn validate_kafka_acks(acks: &str) -> Result<(), ValidationError> {
2228 match acks {
2229 "" | "none" | "leader" | "all" | "0" | "1" | "-1" => Ok(()),
2230 _ => Err(ValidationError::new("invalid_kafka_acks")
2231 .with_message("acks must be: none, leader, all, 0, 1, or -1".into())),
2232 }
2233}
2234
2235#[allow(dead_code)]
2241#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2242#[serde(rename_all = "camelCase")]
2243pub struct GcsSinkConfig {
2244 #[serde(default)]
2246 pub bucket: Option<String>,
2247
2248 #[serde(default)]
2250 pub prefix: Option<String>,
2251
2252 #[serde(default)]
2254 pub format: Option<String>,
2255
2256 #[serde(default)]
2258 pub compression: Option<String>,
2259
2260 #[serde(default)]
2262 pub partitioning: Option<String>,
2263
2264 #[serde(default)]
2266 #[validate(range(min = 1, max = 1000000, message = "batch size must be 1-1000000"))]
2267 pub batch_size: Option<i64>,
2268
2269 #[serde(default)]
2271 #[validate(range(
2272 min = 1,
2273 max = 86400,
2274 message = "flush interval must be 1-86400 seconds"
2275 ))]
2276 pub flush_interval_seconds: Option<i64>,
2277
2278 #[serde(default)]
2280 pub credentials_file: Option<String>,
2281
2282 #[serde(default)]
2284 pub use_adc: Option<bool>,
2285}
2286
2287#[allow(dead_code)]
2289#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2290#[serde(rename_all = "camelCase")]
2291pub struct AzureBlobSinkConfig {
2292 #[serde(default)]
2294 pub account_name: Option<String>,
2295
2296 #[serde(default)]
2298 pub container: Option<String>,
2299
2300 #[serde(default)]
2302 pub prefix: Option<String>,
2303
2304 #[serde(default)]
2306 pub format: Option<String>,
2307
2308 #[serde(default)]
2310 pub compression: Option<String>,
2311
2312 #[serde(default)]
2314 pub partitioning: Option<String>,
2315
2316 #[serde(default)]
2318 #[validate(range(min = 1, max = 1000000, message = "batch size must be 1-1000000"))]
2319 pub batch_size: Option<i64>,
2320
2321 #[serde(default)]
2323 #[validate(range(
2324 min = 1,
2325 max = 86400,
2326 message = "flush interval must be 1-86400 seconds"
2327 ))]
2328 pub flush_interval_seconds: Option<i64>,
2329}
2330
2331#[allow(dead_code)]
2337#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2338#[serde(rename_all = "camelCase")]
2339pub struct SnowflakeSinkConfig {
2340 #[serde(default)]
2342 pub account: Option<String>,
2343
2344 #[serde(default)]
2346 pub user: Option<String>,
2347
2348 #[serde(default)]
2350 pub private_key_path: Option<String>,
2351
2352 #[serde(default)]
2354 pub database: Option<String>,
2355
2356 #[serde(default)]
2358 pub schema: Option<String>,
2359
2360 #[serde(default)]
2362 pub table: Option<String>,
2363
2364 #[serde(default)]
2366 pub warehouse: Option<String>,
2367
2368 #[serde(default)]
2370 pub role: Option<String>,
2371
2372 #[serde(default)]
2374 #[validate(range(min = 1, max = 100000, message = "batch size must be 1-100000"))]
2375 pub batch_size: Option<i64>,
2376}
2377
2378#[allow(dead_code)]
2380#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2381#[serde(rename_all = "camelCase")]
2382pub struct BigQuerySinkConfig {
2383 #[serde(default)]
2385 pub project_id: Option<String>,
2386
2387 #[serde(default)]
2389 pub dataset_id: Option<String>,
2390
2391 #[serde(default)]
2393 pub table_id: Option<String>,
2394
2395 #[serde(default)]
2397 pub credentials_file: Option<String>,
2398
2399 #[serde(default)]
2401 pub use_adc: Option<bool>,
2402
2403 #[serde(default)]
2405 #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2406 pub batch_size: Option<i64>,
2407
2408 #[serde(default)]
2410 pub auto_create_table: Option<bool>,
2411}
2412
2413#[allow(dead_code)]
2415#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2416#[serde(rename_all = "camelCase")]
2417pub struct RedshiftSinkConfig {
2418 #[serde(default)]
2420 pub host: Option<String>,
2421
2422 #[serde(default)]
2424 #[validate(range(min = 1, max = 65535, message = "port must be 1-65535"))]
2425 pub port: Option<i32>,
2426
2427 #[serde(default)]
2429 pub database: Option<String>,
2430
2431 #[serde(default)]
2433 pub user: Option<String>,
2434
2435 #[serde(default)]
2437 pub schema: Option<String>,
2438
2439 #[serde(default)]
2441 pub table: Option<String>,
2442
2443 #[serde(default)]
2445 #[validate(custom(function = "validate_redshift_ssl_mode"))]
2446 pub ssl_mode: Option<String>,
2447
2448 #[serde(default)]
2450 #[validate(range(min = 1, max = 100000, message = "batch size must be 1-100000"))]
2451 pub batch_size: Option<i64>,
2452}
2453
2454#[allow(dead_code)]
2455fn validate_redshift_ssl_mode(mode: &str) -> Result<(), ValidationError> {
2456 match mode {
2457 "" | "disable" | "prefer" | "require" | "verify-ca" | "verify-full" => Ok(()),
2458 _ => Err(ValidationError::new("invalid_ssl_mode").with_message(
2459 "SSL mode must be: disable, prefer, require, verify-ca, or verify-full".into(),
2460 )),
2461 }
2462}
2463
2464#[allow(dead_code)]
2466#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2467#[serde(rename_all = "camelCase")]
2468pub struct SourceTopicConfigSpec {
2469 #[serde(default)]
2471 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
2472 pub partitions: Option<i32>,
2473
2474 #[serde(default)]
2476 #[validate(range(
2477 min = 1,
2478 max = 10,
2479 message = "replication factor must be between 1 and 10"
2480 ))]
2481 pub replication_factor: Option<i32>,
2482
2483 #[serde(default)]
2485 pub auto_create: Option<bool>,
2486}
2487
2488#[allow(dead_code)]
2490#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2491#[serde(rename_all = "camelCase")]
2492pub struct SinkConnectorSpec {
2493 #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
2495 #[validate(custom(function = "validate_k8s_name"))]
2496 pub name: String,
2497
2498 #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
2500 #[validate(custom(function = "validate_connector_type"))]
2501 pub connector: String,
2502
2503 #[validate(length(min = 1, max = 100, message = "must have 1-100 topics"))]
2505 pub topics: Vec<String>,
2506
2507 #[validate(length(
2509 min = 1,
2510 max = 128,
2511 message = "consumer group must be 1-128 characters"
2512 ))]
2513 pub consumer_group: String,
2514
2515 #[serde(default = "default_true")]
2517 pub enabled: bool,
2518
2519 #[serde(default = "default_start_offset")]
2521 #[validate(custom(function = "validate_start_offset"))]
2522 pub start_offset: String,
2523
2524 #[serde(default)]
2530 #[validate(nested)]
2531 pub s3: Option<S3SinkConfig>,
2532
2533 #[serde(default)]
2535 #[validate(nested)]
2536 pub http: Option<HttpSinkConfig>,
2537
2538 #[serde(default)]
2540 #[validate(nested)]
2541 pub stdout: Option<StdoutSinkConfig>,
2542
2543 #[serde(default)]
2545 #[validate(nested)]
2546 pub kafka: Option<KafkaSinkConfig>,
2547
2548 #[serde(default)]
2550 #[validate(nested)]
2551 pub gcs: Option<GcsSinkConfig>,
2552
2553 #[serde(default)]
2555 #[validate(nested)]
2556 pub azure_blob: Option<AzureBlobSinkConfig>,
2557
2558 #[serde(default)]
2560 #[validate(nested)]
2561 pub snowflake: Option<SnowflakeSinkConfig>,
2562
2563 #[serde(default)]
2565 #[validate(nested)]
2566 pub bigquery: Option<BigQuerySinkConfig>,
2567
2568 #[serde(default)]
2570 #[validate(nested)]
2571 pub redshift: Option<RedshiftSinkConfig>,
2572
2573 #[serde(default)]
2579 pub config: serde_json::Value,
2580
2581 #[serde(default)]
2583 #[validate(custom(function = "validate_optional_k8s_name"))]
2584 pub config_secret_ref: Option<String>,
2585
2586 #[serde(default)]
2588 #[validate(nested)]
2589 pub rate_limit: RateLimitSpec,
2590}
2591
2592#[allow(dead_code)]
2593fn default_start_offset() -> String {
2594 "latest".to_string()
2595}
2596
2597#[allow(dead_code)]
2598fn validate_start_offset(offset: &str) -> Result<(), ValidationError> {
2599 match offset {
2600 "earliest" | "latest" => Ok(()),
2601 s if s.contains('T') && s.contains(':') => Ok(()), _ => Err(ValidationError::new("invalid_start_offset").with_message(
2603 "start offset must be 'earliest', 'latest', or ISO 8601 timestamp".into(),
2604 )),
2605 }
2606}
2607
2608#[allow(dead_code)]
2610#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2611#[serde(rename_all = "camelCase")]
2612pub struct RateLimitSpec {
2613 #[serde(default)]
2615 #[validate(range(
2616 min = 0,
2617 max = 1_000_000,
2618 message = "events per second must be 0-1000000"
2619 ))]
2620 pub events_per_second: u64,
2621
2622 #[serde(default)]
2624 #[validate(range(min = 0, max = 100_000, message = "burst capacity must be 0-100000"))]
2625 pub burst_capacity: Option<u64>,
2626}
2627
2628#[allow(dead_code)]
2630#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2631#[serde(rename_all = "camelCase")]
2632pub struct GlobalConnectSettings {
2633 #[serde(default)]
2635 #[validate(nested)]
2636 pub topic: TopicSettingsSpec,
2637
2638 #[serde(default)]
2640 #[validate(nested)]
2641 pub retry: RetryConfigSpec,
2642
2643 #[serde(default)]
2645 #[validate(nested)]
2646 pub health: HealthConfigSpec,
2647
2648 #[serde(default)]
2650 #[validate(nested)]
2651 pub metrics: ConnectMetricsSpec,
2652}
2653
2654#[allow(dead_code)]
2656#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2657#[serde(rename_all = "camelCase")]
2658pub struct TopicSettingsSpec {
2659 #[serde(default = "default_true")]
2661 pub auto_create: bool,
2662
2663 #[serde(default = "default_topic_partitions")]
2665 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
2666 pub default_partitions: i32,
2667
2668 #[serde(default = "default_topic_replication")]
2670 #[validate(range(
2671 min = 1,
2672 max = 10,
2673 message = "replication factor must be between 1 and 10"
2674 ))]
2675 pub default_replication_factor: i32,
2676
2677 #[serde(default = "default_true")]
2679 pub require_topic_exists: bool,
2680}
2681
2682#[allow(dead_code)]
2683fn default_topic_partitions() -> i32 {
2684 1
2685}
2686
2687#[allow(dead_code)]
2688fn default_topic_replication() -> i32 {
2689 1
2690}
2691
2692impl Default for TopicSettingsSpec {
2693 fn default() -> Self {
2694 Self {
2695 auto_create: true,
2696 default_partitions: 1,
2697 default_replication_factor: 1,
2698 require_topic_exists: true,
2699 }
2700 }
2701}
2702
2703#[allow(dead_code)]
2705#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2706#[serde(rename_all = "camelCase")]
2707pub struct RetryConfigSpec {
2708 #[serde(default = "default_max_retries")]
2710 #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
2711 pub max_retries: i32,
2712
2713 #[serde(default = "default_initial_backoff_ms")]
2715 #[validate(range(min = 10, max = 60000, message = "initial backoff must be 10-60000ms"))]
2716 pub initial_backoff_ms: i64,
2717
2718 #[serde(default = "default_max_backoff_ms")]
2720 #[validate(range(
2721 min = 100,
2722 max = 3600000,
2723 message = "max backoff must be 100-3600000ms"
2724 ))]
2725 pub max_backoff_ms: i64,
2726
2727 #[serde(default = "default_backoff_multiplier")]
2729 pub backoff_multiplier: f64,
2730}
2731
2732#[allow(dead_code)]
2733fn default_max_retries() -> i32 {
2734 10
2735}
2736
2737#[allow(dead_code)]
2738fn default_initial_backoff_ms() -> i64 {
2739 100
2740}
2741
2742#[allow(dead_code)]
2743fn default_max_backoff_ms() -> i64 {
2744 30000
2745}
2746
2747#[allow(dead_code)]
2748fn default_backoff_multiplier() -> f64 {
2749 2.0
2750}
2751
2752impl Default for RetryConfigSpec {
2753 fn default() -> Self {
2754 Self {
2755 max_retries: 10,
2756 initial_backoff_ms: 100,
2757 max_backoff_ms: 30000,
2758 backoff_multiplier: 2.0,
2759 }
2760 }
2761}
2762
2763#[allow(dead_code)]
2765#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2766#[serde(rename_all = "camelCase")]
2767pub struct HealthConfigSpec {
2768 #[serde(default)]
2770 pub enabled: bool,
2771
2772 #[serde(default = "default_health_port")]
2774 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
2775 pub port: i32,
2776
2777 #[serde(default = "default_health_path")]
2779 pub path: String,
2780}
2781
2782#[allow(dead_code)]
2783fn default_health_port() -> i32 {
2784 8080
2785}
2786
2787#[allow(dead_code)]
2788fn default_health_path() -> String {
2789 "/health".to_string()
2790}
2791
2792impl Default for HealthConfigSpec {
2793 fn default() -> Self {
2794 Self {
2795 enabled: false,
2796 port: 8080,
2797 path: "/health".to_string(),
2798 }
2799 }
2800}
2801
2802#[allow(dead_code)]
2804#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2805#[serde(rename_all = "camelCase")]
2806pub struct ConnectMetricsSpec {
2807 #[serde(default)]
2809 pub enabled: bool,
2810
2811 #[serde(default = "default_connect_metrics_port")]
2813 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
2814 pub port: i32,
2815}
2816
2817#[allow(dead_code)]
2818fn default_connect_metrics_port() -> i32 {
2819 9091
2820}
2821
2822impl Default for ConnectMetricsSpec {
2823 fn default() -> Self {
2824 Self {
2825 enabled: false,
2826 port: 9091,
2827 }
2828 }
2829}
2830
2831#[allow(dead_code)]
2833#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2834#[serde(rename_all = "camelCase")]
2835pub struct ConnectTlsSpec {
2836 #[serde(default)]
2838 pub enabled: bool,
2839
2840 #[serde(default)]
2842 #[validate(custom(function = "validate_optional_k8s_name"))]
2843 pub cert_secret_name: Option<String>,
2844
2845 #[serde(default)]
2847 pub mtls_enabled: bool,
2848
2849 #[serde(default)]
2851 #[validate(custom(function = "validate_optional_k8s_name"))]
2852 pub ca_secret_name: Option<String>,
2853
2854 #[serde(default)]
2856 pub insecure: bool,
2857}
2858
2859#[allow(dead_code)]
2860fn default_connect_replicas() -> i32 {
2861 1
2862}
2863
2864#[allow(dead_code)]
2866#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
2867#[serde(rename_all = "camelCase")]
2868pub struct RivvenConnectStatus {
2869 pub phase: ConnectPhase,
2871
2872 pub replicas: i32,
2874
2875 pub ready_replicas: i32,
2877
2878 pub sources_running: i32,
2880
2881 pub sinks_running: i32,
2883
2884 pub sources_total: i32,
2886
2887 pub sinks_total: i32,
2889
2890 pub observed_generation: i64,
2892
2893 #[serde(default)]
2895 pub conditions: Vec<ConnectCondition>,
2896
2897 #[serde(default)]
2899 pub connector_statuses: Vec<ConnectorStatus>,
2900
2901 pub last_updated: Option<String>,
2903
2904 pub message: Option<String>,
2906}
2907
2908#[allow(dead_code)]
2910#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
2911pub enum ConnectPhase {
2912 #[default]
2914 Pending,
2915 Starting,
2917 Running,
2919 Degraded,
2921 Failed,
2923 Terminating,
2925}
2926
2927#[allow(dead_code)]
2929#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2930#[serde(rename_all = "camelCase")]
2931pub struct ConnectCondition {
2932 #[serde(rename = "type")]
2934 pub condition_type: String,
2935
2936 pub status: String,
2938
2939 pub reason: Option<String>,
2941
2942 pub message: Option<String>,
2944
2945 pub last_transition_time: Option<String>,
2947}
2948
2949#[allow(dead_code)]
2951#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2952#[serde(rename_all = "camelCase")]
2953pub struct ConnectorStatus {
2954 pub name: String,
2956
2957 pub connector_type: String,
2959
2960 pub kind: String,
2962
2963 pub state: String,
2965
2966 pub events_processed: i64,
2968
2969 pub last_error: Option<String>,
2971
2972 pub last_success_time: Option<String>,
2974}
2975
2976#[allow(dead_code)]
2977impl RivvenConnectSpec {
2978 pub fn get_image(&self) -> String {
2980 if let Some(ref image) = self.image {
2981 image.clone()
2982 } else {
2983 format!("ghcr.io/hupe1980/rivven-connect:{}", self.version)
2984 }
2985 }
2986
2987 pub fn get_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2989 let mut labels = BTreeMap::new();
2990 labels.insert(
2991 "app.kubernetes.io/name".to_string(),
2992 "rivven-connect".to_string(),
2993 );
2994 labels.insert(
2995 "app.kubernetes.io/instance".to_string(),
2996 connect_name.to_string(),
2997 );
2998 labels.insert(
2999 "app.kubernetes.io/component".to_string(),
3000 "connector".to_string(),
3001 );
3002 labels.insert(
3003 "app.kubernetes.io/managed-by".to_string(),
3004 "rivven-operator".to_string(),
3005 );
3006 labels.insert(
3007 "app.kubernetes.io/version".to_string(),
3008 self.version.clone(),
3009 );
3010 labels
3011 }
3012
3013 pub fn get_selector_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
3015 let mut labels = BTreeMap::new();
3016 labels.insert(
3017 "app.kubernetes.io/name".to_string(),
3018 "rivven-connect".to_string(),
3019 );
3020 labels.insert(
3021 "app.kubernetes.io/instance".to_string(),
3022 connect_name.to_string(),
3023 );
3024 labels
3025 }
3026
3027 pub fn enabled_sources_count(&self) -> usize {
3029 self.sources.iter().filter(|s| s.enabled).count()
3030 }
3031
3032 pub fn enabled_sinks_count(&self) -> usize {
3034 self.sinks.iter().filter(|s| s.enabled).count()
3035 }
3036}
3037
3038#[cfg(test)]
3039mod tests {
3040 use super::*;
3041
3042 #[test]
3043 fn test_default_spec() {
3044 let spec = RivvenClusterSpec {
3045 replicas: 3,
3046 version: "0.0.1".to_string(),
3047 image: None,
3048 image_pull_policy: "IfNotPresent".to_string(),
3049 image_pull_secrets: vec![],
3050 storage: StorageSpec::default(),
3051 resources: None,
3052 config: BrokerConfig::default(),
3053 tls: TlsSpec::default(),
3054 metrics: MetricsSpec::default(),
3055 affinity: None,
3056 node_selector: BTreeMap::new(),
3057 tolerations: vec![],
3058 pod_disruption_budget: PdbSpec::default(),
3059 service_account: None,
3060 pod_annotations: BTreeMap::new(),
3061 pod_labels: BTreeMap::new(),
3062 env: vec![],
3063 liveness_probe: ProbeSpec::default(),
3064 readiness_probe: ProbeSpec::default(),
3065 security_context: None,
3066 container_security_context: None,
3067 };
3068
3069 assert_eq!(spec.replicas, 3);
3070 assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven:0.0.1");
3071 }
3072
3073 #[test]
3074 fn test_get_labels() {
3075 let spec = RivvenClusterSpec {
3076 replicas: 3,
3077 version: "0.0.1".to_string(),
3078 image: None,
3079 image_pull_policy: "IfNotPresent".to_string(),
3080 image_pull_secrets: vec![],
3081 storage: StorageSpec::default(),
3082 resources: None,
3083 config: BrokerConfig::default(),
3084 tls: TlsSpec::default(),
3085 metrics: MetricsSpec::default(),
3086 affinity: None,
3087 node_selector: BTreeMap::new(),
3088 tolerations: vec![],
3089 pod_disruption_budget: PdbSpec::default(),
3090 service_account: None,
3091 pod_annotations: BTreeMap::new(),
3092 pod_labels: BTreeMap::new(),
3093 env: vec![],
3094 liveness_probe: ProbeSpec::default(),
3095 readiness_probe: ProbeSpec::default(),
3096 security_context: None,
3097 container_security_context: None,
3098 };
3099
3100 let labels = spec.get_labels("my-cluster");
3101 assert_eq!(
3102 labels.get("app.kubernetes.io/name"),
3103 Some(&"rivven".to_string())
3104 );
3105 assert_eq!(
3106 labels.get("app.kubernetes.io/instance"),
3107 Some(&"my-cluster".to_string())
3108 );
3109 }
3110
3111 #[test]
3112 fn test_custom_image() {
3113 let spec = RivvenClusterSpec {
3114 replicas: 1,
3115 version: "0.0.1".to_string(),
3116 image: Some("my-registry/rivven:custom".to_string()),
3117 image_pull_policy: "Always".to_string(),
3118 image_pull_secrets: vec![],
3119 storage: StorageSpec::default(),
3120 resources: None,
3121 config: BrokerConfig::default(),
3122 tls: TlsSpec::default(),
3123 metrics: MetricsSpec::default(),
3124 affinity: None,
3125 node_selector: BTreeMap::new(),
3126 tolerations: vec![],
3127 pod_disruption_budget: PdbSpec::default(),
3128 service_account: None,
3129 pod_annotations: BTreeMap::new(),
3130 pod_labels: BTreeMap::new(),
3131 env: vec![],
3132 liveness_probe: ProbeSpec::default(),
3133 readiness_probe: ProbeSpec::default(),
3134 security_context: None,
3135 container_security_context: None,
3136 };
3137
3138 assert_eq!(spec.get_image(), "my-registry/rivven:custom");
3139 }
3140
3141 #[test]
3142 fn test_cluster_phase_default() {
3143 let phase = ClusterPhase::default();
3144 assert_eq!(phase, ClusterPhase::Pending);
3145 }
3146
3147 #[test]
3148 fn test_storage_spec_default() {
3149 let storage = StorageSpec::default();
3150 assert_eq!(storage.size, "10Gi");
3151 assert!(storage.storage_class_name.is_none());
3152 }
3153
3154 #[test]
3155 fn test_broker_config_defaults() {
3156 let config = BrokerConfig::default();
3157 assert_eq!(config.default_partitions, 3);
3158 assert_eq!(config.default_replication_factor, 2);
3159 assert!(config.auto_create_topics);
3160 }
3161
3162 #[test]
3163 fn test_probe_spec_defaults() {
3164 let probe = ProbeSpec::default();
3165 assert!(probe.enabled);
3166 assert_eq!(probe.initial_delay_seconds, 30);
3167 assert_eq!(probe.period_seconds, 10);
3168 }
3169
3170 #[test]
3171 fn test_validate_quantity_valid() {
3172 assert!(validate_quantity("10Gi").is_ok());
3173 assert!(validate_quantity("100Mi").is_ok());
3174 assert!(validate_quantity("1Ti").is_ok());
3175 assert!(validate_quantity("500").is_ok());
3176 assert!(validate_quantity("1.5Gi").is_ok());
3177 }
3178
3179 #[test]
3180 fn test_validate_quantity_invalid() {
3181 assert!(validate_quantity("10GB").is_err()); assert!(validate_quantity("abc").is_err()); assert!(validate_quantity("-10Gi").is_err()); assert!(validate_quantity("").is_err()); }
3186
3187 #[test]
3188 fn test_validate_k8s_name_valid() {
3189 assert!(validate_k8s_name("my-cluster").is_ok());
3190 assert!(validate_k8s_name("cluster123").is_ok());
3191 assert!(validate_k8s_name("a").is_ok());
3192 }
3193
3194 #[test]
3195 fn test_validate_k8s_name_invalid() {
3196 assert!(validate_k8s_name("My-Cluster").is_err()); assert!(validate_k8s_name("-cluster").is_err()); assert!(validate_k8s_name("cluster-").is_err()); assert!(validate_k8s_name("cluster_name").is_err()); }
3201
3202 #[test]
3203 fn test_validate_compression_type() {
3204 assert!(validate_compression_type("lz4").is_ok());
3205 assert!(validate_compression_type("zstd").is_ok());
3206 assert!(validate_compression_type("none").is_ok());
3207 assert!(validate_compression_type("invalid").is_err());
3208 }
3209
3210 #[test]
3211 fn test_validate_segment_size() {
3212 assert!(validate_segment_size(1_048_576).is_ok()); assert!(validate_segment_size(10_737_418_240).is_ok()); assert!(validate_segment_size(1_073_741_824).is_ok()); assert!(validate_segment_size(1_000).is_err()); assert!(validate_segment_size(20_000_000_000).is_err()); }
3218
3219 #[test]
3220 fn test_validate_message_size() {
3221 assert!(validate_message_size(1_024).is_ok()); assert!(validate_message_size(104_857_600).is_ok()); assert!(validate_message_size(1_048_576).is_ok()); assert!(validate_message_size(100).is_err()); assert!(validate_message_size(200_000_000).is_err()); }
3227
3228 #[test]
3229 fn test_validate_pull_policy() {
3230 assert!(validate_pull_policy("Always").is_ok());
3231 assert!(validate_pull_policy("IfNotPresent").is_ok());
3232 assert!(validate_pull_policy("Never").is_ok());
3233 assert!(validate_pull_policy("always").is_err()); assert!(validate_pull_policy("Invalid").is_err());
3235 }
3236
3237 #[test]
3238 fn test_validate_duration() {
3239 assert!(validate_duration("30s").is_ok());
3240 assert!(validate_duration("1m").is_ok());
3241 assert!(validate_duration("5m30s").is_ok());
3242 assert!(validate_duration("1h").is_ok());
3243 assert!(validate_duration("invalid").is_err());
3244 assert!(validate_duration("30").is_err()); }
3246
3247 #[test]
3248 fn test_validate_access_modes() {
3249 assert!(validate_access_modes(&["ReadWriteOnce".to_string()]).is_ok());
3250 assert!(
3251 validate_access_modes(&["ReadWriteOnce".to_string(), "ReadOnlyMany".to_string()])
3252 .is_ok()
3253 );
3254 assert!(validate_access_modes(&["Invalid".to_string()]).is_err());
3255 }
3256
3257 #[test]
3259 fn test_connect_spec_defaults() {
3260 let spec = RivvenConnectSpec {
3261 cluster_ref: ClusterReference {
3262 name: "my-cluster".to_string(),
3263 namespace: None,
3264 },
3265 replicas: 1,
3266 version: "0.0.1".to_string(),
3267 image: None,
3268 image_pull_policy: "IfNotPresent".to_string(),
3269 image_pull_secrets: vec![],
3270 resources: None,
3271 config: ConnectConfigSpec::default(),
3272 sources: vec![],
3273 sinks: vec![],
3274 settings: GlobalConnectSettings::default(),
3275 tls: ConnectTlsSpec::default(),
3276 pod_annotations: BTreeMap::new(),
3277 pod_labels: BTreeMap::new(),
3278 env: vec![],
3279 node_selector: BTreeMap::new(),
3280 tolerations: vec![],
3281 affinity: None,
3282 service_account: None,
3283 security_context: None,
3284 container_security_context: None,
3285 };
3286 assert_eq!(spec.replicas, 1);
3287 }
3288
3289 #[test]
3290 fn test_connect_phase_default() {
3291 let phase = ConnectPhase::default();
3292 assert_eq!(phase, ConnectPhase::Pending);
3293 }
3294
3295 #[test]
3296 fn test_validate_connector_type() {
3297 assert!(validate_connector_type("postgres-cdc").is_ok());
3298 assert!(validate_connector_type("mysql-cdc").is_ok());
3299 assert!(validate_connector_type("http").is_ok());
3300 assert!(validate_connector_type("stdout").is_ok());
3301 assert!(validate_connector_type("s3").is_ok());
3302 assert!(validate_connector_type("datagen").is_ok());
3303 assert!(validate_connector_type("custom-connector").is_ok());
3304 }
3305
3306 #[test]
3307 fn test_validate_start_offset() {
3308 assert!(validate_start_offset("earliest").is_ok());
3309 assert!(validate_start_offset("latest").is_ok());
3310 assert!(validate_start_offset("2024-01-01T00:00:00Z").is_ok());
3311 assert!(validate_start_offset("invalid").is_err());
3312 }
3313
3314 #[test]
3315 fn test_validate_image_valid() {
3316 assert!(validate_image("nginx").is_ok());
3317 assert!(validate_image("nginx:latest").is_ok());
3318 assert!(validate_image("ghcr.io/hupe1980/rivven:0.0.1").is_ok());
3319 assert!(validate_image("my-registry.io:5000/image:tag").is_ok());
3320 assert!(validate_image("localhost:5000/myimage").is_ok());
3321 assert!(validate_image("").is_ok()); }
3323
3324 #[test]
3325 fn test_validate_image_invalid() {
3326 assert!(validate_image("/absolute/path").is_err()); assert!(validate_image("-invalid").is_err()); assert!(validate_image("image..path").is_err()); let long_name = "a".repeat(300);
3331 assert!(validate_image(&long_name).is_err());
3332 }
3333
3334 #[test]
3335 fn test_validate_node_selector() {
3336 let mut selectors = BTreeMap::new();
3337 selectors.insert("node-type".to_string(), "compute".to_string());
3338 assert!(validate_node_selector(&selectors).is_ok());
3339
3340 let mut many = BTreeMap::new();
3342 for i in 0..25 {
3343 many.insert(format!("key-{}", i), "value".to_string());
3344 }
3345 assert!(validate_node_selector(&many).is_err());
3346 }
3347
3348 #[test]
3349 fn test_validate_annotations() {
3350 let mut annotations = BTreeMap::new();
3351 annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
3352 assert!(validate_annotations(&annotations).is_ok());
3353
3354 let mut many = BTreeMap::new();
3356 for i in 0..55 {
3357 many.insert(format!("annotation-{}", i), "value".to_string());
3358 }
3359 assert!(validate_annotations(&many).is_err());
3360 }
3361
3362 #[test]
3363 fn test_validate_labels() {
3364 let mut labels = BTreeMap::new();
3365 labels.insert("team".to_string(), "platform".to_string());
3366 assert!(validate_labels(&labels).is_ok());
3367
3368 let mut reserved = BTreeMap::new();
3370 reserved.insert("app.kubernetes.io/custom".to_string(), "value".to_string());
3371 assert!(validate_labels(&reserved).is_err());
3372 }
3373
3374 #[test]
3375 fn test_validate_raw_config() {
3376 let mut config = BTreeMap::new();
3377 config.insert("custom.setting".to_string(), "value".to_string());
3378 assert!(validate_raw_config(&config).is_ok());
3379
3380 let mut forbidden = BTreeMap::new();
3382 forbidden.insert("command".to_string(), "/bin/sh".to_string());
3383 assert!(validate_raw_config(&forbidden).is_err());
3384
3385 let mut many = BTreeMap::new();
3387 for i in 0..55 {
3388 many.insert(format!("config-{}", i), "value".to_string());
3389 }
3390 assert!(validate_raw_config(&many).is_err());
3391 }
3392
3393 #[test]
3394 fn test_validate_int_or_percent() {
3395 assert!(validate_optional_int_or_percent("1").is_ok());
3396 assert!(validate_optional_int_or_percent("25%").is_ok());
3397 assert!(validate_optional_int_or_percent("100%").is_ok());
3398 assert!(validate_optional_int_or_percent("").is_ok()); assert!(validate_optional_int_or_percent("abc").is_err());
3400 assert!(validate_optional_int_or_percent("25%%").is_err());
3401 }
3402
3403 #[test]
3404 fn test_tls_spec_default() {
3405 let tls = TlsSpec::default();
3406 assert!(!tls.enabled);
3407 assert!(tls.cert_secret_name.is_none());
3408 assert!(!tls.mtls_enabled);
3409 }
3410
3411 #[test]
3412 fn test_metrics_spec_default() {
3413 let metrics = MetricsSpec::default();
3414 assert!(metrics.enabled);
3415 assert_eq!(metrics.port, 9090);
3416 }
3417
3418 #[test]
3419 fn test_pdb_spec_default() {
3420 let pdb = PdbSpec::default();
3421 assert!(pdb.enabled);
3422 assert!(pdb.min_available.is_none());
3423 assert_eq!(pdb.max_unavailable, Some("1".to_string()));
3424 }
3425
3426 #[test]
3427 fn test_service_monitor_labels() {
3428 let mut labels = BTreeMap::new();
3429 labels.insert("release".to_string(), "prometheus".to_string());
3430 assert!(validate_service_monitor_labels(&labels).is_ok());
3431
3432 let mut many = BTreeMap::new();
3434 for i in 0..15 {
3435 many.insert(format!("label-{}", i), "value".to_string());
3436 }
3437 assert!(validate_service_monitor_labels(&many).is_err());
3438 }
3439
3440 #[test]
3441 fn test_cluster_condition_time_format() {
3442 let condition = ClusterCondition {
3443 condition_type: "Ready".to_string(),
3444 status: "True".to_string(),
3445 last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
3446 reason: Some("AllReplicasReady".to_string()),
3447 message: Some("All replicas are ready".to_string()),
3448 };
3449 assert!(condition.last_transition_time.unwrap().contains('T'));
3450 }
3451
3452 #[test]
3457 fn test_validate_snapshot_mode() {
3458 assert!(validate_snapshot_mode("initial").is_ok());
3459 assert!(validate_snapshot_mode("never").is_ok());
3460 assert!(validate_snapshot_mode("when_needed").is_ok());
3461 assert!(validate_snapshot_mode("exported").is_ok());
3462 assert!(validate_snapshot_mode("custom").is_ok());
3463 assert!(validate_snapshot_mode("").is_ok()); assert!(validate_snapshot_mode("invalid").is_err());
3465 }
3466
3467 #[test]
3468 fn test_validate_decoding_plugin() {
3469 assert!(validate_decoding_plugin("pgoutput").is_ok());
3470 assert!(validate_decoding_plugin("wal2json").is_ok());
3471 assert!(validate_decoding_plugin("decoderbufs").is_ok());
3472 assert!(validate_decoding_plugin("").is_ok()); assert!(validate_decoding_plugin("invalid").is_err());
3474 }
3475
3476 #[test]
3477 fn test_validate_mysql_snapshot_mode() {
3478 assert!(validate_mysql_snapshot_mode("initial").is_ok());
3479 assert!(validate_mysql_snapshot_mode("never").is_ok());
3480 assert!(validate_mysql_snapshot_mode("when_needed").is_ok());
3481 assert!(validate_mysql_snapshot_mode("schema_only").is_ok());
3482 assert!(validate_mysql_snapshot_mode("").is_ok());
3483 assert!(validate_mysql_snapshot_mode("invalid").is_err());
3484 }
3485
3486 #[test]
3487 fn test_validate_output_format() {
3488 assert!(validate_output_format("json").is_ok());
3489 assert!(validate_output_format("jsonl").is_ok());
3490 assert!(validate_output_format("parquet").is_ok());
3491 assert!(validate_output_format("avro").is_ok());
3492 assert!(validate_output_format("").is_ok());
3493 assert!(validate_output_format("xml").is_err());
3494 }
3495
3496 #[test]
3497 fn test_validate_s3_compression() {
3498 assert!(validate_s3_compression("none").is_ok());
3499 assert!(validate_s3_compression("gzip").is_ok());
3500 assert!(validate_s3_compression("snappy").is_ok());
3501 assert!(validate_s3_compression("lz4").is_ok());
3502 assert!(validate_s3_compression("zstd").is_ok());
3503 assert!(validate_s3_compression("").is_ok());
3504 assert!(validate_s3_compression("bzip2").is_err());
3505 }
3506
3507 #[test]
3508 fn test_validate_http_method() {
3509 assert!(validate_http_method("POST").is_ok());
3510 assert!(validate_http_method("PUT").is_ok());
3511 assert!(validate_http_method("PATCH").is_ok());
3512 assert!(validate_http_method("").is_ok());
3513 assert!(validate_http_method("GET").is_err());
3514 assert!(validate_http_method("DELETE").is_err());
3515 }
3516
3517 #[test]
3518 fn test_postgres_cdc_config_default() {
3519 let config = PostgresCdcConfig::default();
3520 assert!(config.slot_name.is_none());
3521 assert!(config.publication.is_none());
3522 assert!(config.snapshot_mode.is_none());
3523 }
3524
3525 #[test]
3526 fn test_s3_sink_config_default() {
3527 let config = S3SinkConfig::default();
3528 assert!(config.bucket.is_none());
3529 assert!(config.region.is_none());
3530 assert!(config.format.is_none());
3531 }
3532
3533 #[test]
3534 fn test_table_spec_with_columns() {
3535 let table = TableSpec {
3536 schema: Some("public".to_string()),
3537 table: "orders".to_string(),
3538 topic: None,
3539 columns: vec!["id".to_string(), "customer_id".to_string()],
3540 exclude_columns: vec!["password".to_string()],
3541 column_masks: std::collections::BTreeMap::from([(
3542 "email".to_string(),
3543 "***@***.***".to_string(),
3544 )]),
3545 };
3546 assert_eq!(table.columns.len(), 2);
3547 assert_eq!(table.exclude_columns.len(), 1);
3548 assert_eq!(table.column_masks.len(), 1);
3549 }
3550
3551 #[test]
3556 fn test_kafka_source_config_default() {
3557 let config = KafkaSourceConfig::default();
3558 assert!(config.brokers.is_none());
3559 assert!(config.topic.is_none());
3560 assert!(config.consumer_group.is_none());
3561 }
3562
3563 #[test]
3564 fn test_validate_kafka_start_offset() {
3565 assert!(validate_kafka_start_offset("earliest").is_ok());
3566 assert!(validate_kafka_start_offset("latest").is_ok());
3567 assert!(validate_kafka_start_offset("").is_ok());
3568 assert!(validate_kafka_start_offset("invalid").is_err());
3569 }
3570
3571 #[test]
3572 fn test_validate_kafka_security_protocol() {
3573 assert!(validate_kafka_security_protocol("plaintext").is_ok());
3574 assert!(validate_kafka_security_protocol("ssl").is_ok());
3575 assert!(validate_kafka_security_protocol("sasl_plaintext").is_ok());
3576 assert!(validate_kafka_security_protocol("sasl_ssl").is_ok());
3577 assert!(validate_kafka_security_protocol("").is_ok());
3578 assert!(validate_kafka_security_protocol("invalid").is_err());
3579 }
3580
3581 #[test]
3582 fn test_mqtt_source_config_default() {
3583 let config = MqttSourceConfig::default();
3584 assert!(config.broker_url.is_none());
3585 assert!(config.topics.is_none());
3586 assert!(config.client_id.is_none());
3587 }
3588
3589 #[test]
3590 fn test_validate_mqtt_qos() {
3591 assert!(validate_mqtt_qos("at_most_once").is_ok());
3592 assert!(validate_mqtt_qos("at_least_once").is_ok());
3593 assert!(validate_mqtt_qos("exactly_once").is_ok());
3594 assert!(validate_mqtt_qos("").is_ok());
3595 assert!(validate_mqtt_qos("invalid").is_err());
3596 }
3597
3598 #[test]
3599 fn test_sqs_source_config_default() {
3600 let config = SqsSourceConfig::default();
3601 assert!(config.queue_url.is_none());
3602 assert!(config.region.is_none());
3603 assert!(config.max_messages.is_none());
3604 }
3605
3606 #[test]
3607 fn test_pubsub_source_config_default() {
3608 let config = PubSubSourceConfig::default();
3609 assert!(config.project_id.is_none());
3610 assert!(config.subscription.is_none());
3611 assert!(config.topic.is_none());
3612 }
3613
3614 #[test]
3615 fn test_kafka_sink_config_default() {
3616 let config = KafkaSinkConfig::default();
3617 assert!(config.brokers.is_none());
3618 assert!(config.topic.is_none());
3619 assert!(config.acks.is_none());
3620 }
3621
3622 #[test]
3623 fn test_validate_kafka_acks() {
3624 assert!(validate_kafka_acks("none").is_ok());
3625 assert!(validate_kafka_acks("leader").is_ok());
3626 assert!(validate_kafka_acks("all").is_ok());
3627 assert!(validate_kafka_acks("0").is_ok());
3628 assert!(validate_kafka_acks("1").is_ok());
3629 assert!(validate_kafka_acks("-1").is_ok());
3630 assert!(validate_kafka_acks("").is_ok());
3631 assert!(validate_kafka_acks("invalid").is_err());
3632 }
3633
3634 #[test]
3639 fn test_gcs_sink_config_default() {
3640 let config = GcsSinkConfig::default();
3641 assert!(config.bucket.is_none());
3642 assert!(config.prefix.is_none());
3643 assert!(config.format.is_none());
3644 }
3645
3646 #[test]
3647 fn test_azure_blob_sink_config_default() {
3648 let config = AzureBlobSinkConfig::default();
3649 assert!(config.account_name.is_none());
3650 assert!(config.container.is_none());
3651 assert!(config.format.is_none());
3652 }
3653
3654 #[test]
3659 fn test_snowflake_sink_config_default() {
3660 let config = SnowflakeSinkConfig::default();
3661 assert!(config.account.is_none());
3662 assert!(config.user.is_none());
3663 assert!(config.database.is_none());
3664 }
3665
3666 #[test]
3667 fn test_bigquery_sink_config_default() {
3668 let config = BigQuerySinkConfig::default();
3669 assert!(config.project_id.is_none());
3670 assert!(config.dataset_id.is_none());
3671 assert!(config.table_id.is_none());
3672 }
3673
3674 #[test]
3675 fn test_redshift_sink_config_default() {
3676 let config = RedshiftSinkConfig::default();
3677 assert!(config.host.is_none());
3678 assert!(config.database.is_none());
3679 assert!(config.table.is_none());
3680 }
3681
3682 #[test]
3683 fn test_validate_redshift_ssl_mode() {
3684 assert!(validate_redshift_ssl_mode("disable").is_ok());
3685 assert!(validate_redshift_ssl_mode("prefer").is_ok());
3686 assert!(validate_redshift_ssl_mode("require").is_ok());
3687 assert!(validate_redshift_ssl_mode("verify-ca").is_ok());
3688 assert!(validate_redshift_ssl_mode("verify-full").is_ok());
3689 assert!(validate_redshift_ssl_mode("").is_ok());
3690 assert!(validate_redshift_ssl_mode("invalid").is_err());
3691 }
3692}