1use k8s_openapi::api::core::v1::ResourceRequirements;
7use kube::CustomResource;
8use regex::Regex;
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11use std::collections::BTreeMap;
12use std::sync::LazyLock;
13use validator::{Validate, ValidationError};
14
15static QUANTITY_REGEX: LazyLock<Regex> =
17 LazyLock::new(|| Regex::new(r"^[0-9]+(\.[0-9]+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$").unwrap());
18
19static NAME_REGEX: LazyLock<Regex> =
21 LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
22
23fn validate_quantity(value: &str) -> Result<(), ValidationError> {
25 if QUANTITY_REGEX.is_match(value) {
26 Ok(())
27 } else {
28 Err(ValidationError::new("invalid_quantity")
29 .with_message(format!("'{}' is not a valid Kubernetes quantity", value).into()))
30 }
31}
32
33fn validate_image(value: &str) -> Result<(), ValidationError> {
35 if value.is_empty() {
36 return Ok(()); }
38 if value.len() > 255 {
39 return Err(ValidationError::new("image_too_long")
40 .with_message("image reference exceeds 255 characters".into()));
41 }
42 if value.contains("..") || value.starts_with('/') || value.starts_with('-') {
44 return Err(ValidationError::new("invalid_image")
45 .with_message(format!("'{}' is not a valid container image", value).into()));
46 }
47 Ok(())
48}
49
50fn validate_k8s_name(value: &str) -> Result<(), ValidationError> {
52 if value.is_empty() {
53 return Ok(()); }
55 if value.len() > 63 {
56 return Err(
57 ValidationError::new("name_too_long").with_message("name exceeds 63 characters".into())
58 );
59 }
60 if !NAME_REGEX.is_match(value) {
61 return Err(ValidationError::new("invalid_name").with_message(
62 format!("'{}' is not a valid Kubernetes name (RFC 1123)", value).into(),
63 ));
64 }
65 Ok(())
66}
67
68fn validate_env_vars(vars: &[k8s_openapi::api::core::v1::EnvVar]) -> Result<(), ValidationError> {
70 const MAX_ENV_VARS: usize = 100;
72 if vars.len() > MAX_ENV_VARS {
73 return Err(ValidationError::new("too_many_env_vars").with_message(
74 format!("maximum {} environment variables allowed", MAX_ENV_VARS).into(),
75 ));
76 }
77 for var in vars {
78 if var.name.is_empty() || var.name.len() > 256 {
80 return Err(ValidationError::new("invalid_env_name")
81 .with_message("environment variable name must be 1-256 characters".into()));
82 }
83 let forbidden_prefixes = ["LD_", "DYLD_", "PATH", "HOME", "USER"];
85 for prefix in forbidden_prefixes {
86 if var.name.starts_with(prefix) && var.value.is_some() {
87 return Err(ValidationError::new("forbidden_env_var").with_message(
88 format!(
89 "environment variable '{}' is not allowed for security",
90 var.name
91 )
92 .into(),
93 ));
94 }
95 }
96 }
97 Ok(())
98}
99
100#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
106#[kube(
107 group = "rivven.hupe1980.github.io",
108 version = "v1alpha1",
109 kind = "RivvenCluster",
110 plural = "rivvenclusters",
111 shortname = "rc",
112 namespaced,
113 status = "RivvenClusterStatus",
114 printcolumn = r#"{"name":"Replicas", "type":"integer", "jsonPath":".spec.replicas"}"#,
115 printcolumn = r#"{"name":"Ready", "type":"integer", "jsonPath":".status.readyReplicas"}"#,
116 printcolumn = r#"{"name":"Phase", "type":"string", "jsonPath":".status.phase"}"#,
117 printcolumn = r#"{"name":"Age", "type":"date", "jsonPath":".metadata.creationTimestamp"}"#
118)]
119#[serde(rename_all = "camelCase")]
120pub struct RivvenClusterSpec {
121 #[serde(default = "default_replicas")]
123 #[validate(range(min = 1, max = 100, message = "replicas must be between 1 and 100"))]
124 pub replicas: i32,
125
126 #[serde(default = "default_version")]
128 #[validate(length(min = 1, max = 64, message = "version must be 1-64 characters"))]
129 pub version: String,
130
131 #[serde(default)]
134 #[validate(custom(function = "validate_optional_image"))]
135 pub image: Option<String>,
136
137 #[serde(default = "default_image_pull_policy")]
139 #[validate(custom(function = "validate_pull_policy"))]
140 pub image_pull_policy: String,
141
142 #[serde(default)]
144 #[validate(length(max = 10, message = "maximum 10 image pull secrets allowed"))]
145 pub image_pull_secrets: Vec<String>,
146
147 #[serde(default)]
149 #[validate(nested)]
150 pub storage: StorageSpec,
151
152 #[serde(default)]
154 #[schemars(skip)]
155 pub resources: Option<ResourceRequirements>,
156
157 #[serde(default)]
159 #[validate(nested)]
160 pub config: BrokerConfig,
161
162 #[serde(default)]
164 #[validate(nested)]
165 pub tls: TlsSpec,
166
167 #[serde(default)]
169 #[validate(nested)]
170 pub metrics: MetricsSpec,
171
172 #[serde(default)]
174 #[schemars(skip)]
175 pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
176
177 #[serde(default)]
179 #[validate(custom(function = "validate_node_selector"))]
180 pub node_selector: BTreeMap<String, String>,
181
182 #[serde(default)]
184 #[schemars(skip)]
185 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
186
187 #[serde(default)]
189 #[validate(nested)]
190 pub pod_disruption_budget: PdbSpec,
191
192 #[serde(default)]
194 #[validate(custom(function = "validate_optional_k8s_name"))]
195 pub service_account: Option<String>,
196
197 #[serde(default)]
199 #[validate(custom(function = "validate_annotations"))]
200 pub pod_annotations: BTreeMap<String, String>,
201
202 #[serde(default)]
204 #[validate(custom(function = "validate_labels"))]
205 pub pod_labels: BTreeMap<String, String>,
206
207 #[serde(default)]
209 #[schemars(skip)]
210 #[validate(custom(function = "validate_env_vars"))]
211 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
212
213 #[serde(default)]
215 #[validate(nested)]
216 pub liveness_probe: ProbeSpec,
217
218 #[serde(default)]
220 #[validate(nested)]
221 pub readiness_probe: ProbeSpec,
222
223 #[serde(default)]
225 #[schemars(skip)]
226 pub security_context: Option<k8s_openapi::api::core::v1::PodSecurityContext>,
227
228 #[serde(default)]
230 #[schemars(skip)]
231 pub container_security_context: Option<k8s_openapi::api::core::v1::SecurityContext>,
232}
233
234fn validate_optional_image(image: &str) -> Result<(), ValidationError> {
236 validate_image(image)
237}
238
239fn validate_pull_policy(policy: &str) -> Result<(), ValidationError> {
241 match policy {
242 "Always" | "IfNotPresent" | "Never" => Ok(()),
243 _ => Err(ValidationError::new("invalid_pull_policy")
244 .with_message("imagePullPolicy must be Always, IfNotPresent, or Never".into())),
245 }
246}
247
248fn validate_node_selector(selectors: &BTreeMap<String, String>) -> Result<(), ValidationError> {
250 if selectors.len() > 20 {
251 return Err(ValidationError::new("too_many_selectors")
252 .with_message("maximum 20 node selectors allowed".into()));
253 }
254 for (key, value) in selectors {
255 if key.len() > 253 || value.len() > 63 {
256 return Err(ValidationError::new("selector_too_long")
257 .with_message("selector key max 253 chars, value max 63 chars".into()));
258 }
259 }
260 Ok(())
261}
262
263fn validate_optional_k8s_name(name: &str) -> Result<(), ValidationError> {
265 if name.is_empty() {
266 return Ok(()); }
268 validate_k8s_name(name)
269}
270
271fn validate_annotations(annotations: &BTreeMap<String, String>) -> Result<(), ValidationError> {
273 if annotations.len() > 50 {
274 return Err(ValidationError::new("too_many_annotations")
275 .with_message("maximum 50 annotations allowed".into()));
276 }
277 for (key, value) in annotations {
278 if key.len() > 253 {
280 return Err(ValidationError::new("annotation_key_too_long")
281 .with_message(format!("annotation key '{}' exceeds 253 characters", key).into()));
282 }
283 if value.len() > 262144 {
285 return Err(ValidationError::new("annotation_value_too_long")
286 .with_message(format!("annotation '{}' value exceeds 256KB", key).into()));
287 }
288 }
289 Ok(())
290}
291
292fn validate_labels(labels: &BTreeMap<String, String>) -> Result<(), ValidationError> {
294 if labels.len() > 20 {
295 return Err(ValidationError::new("too_many_labels")
296 .with_message("maximum 20 labels allowed".into()));
297 }
298 for (key, value) in labels {
299 if key.len() > 253 || value.len() > 63 {
300 return Err(ValidationError::new("label_too_long")
301 .with_message("label key max 253 chars, value max 63 chars".into()));
302 }
303 if key.starts_with("app.kubernetes.io/") {
305 return Err(ValidationError::new("reserved_label").with_message(
306 format!("label '{}' uses reserved prefix app.kubernetes.io/", key).into(),
307 ));
308 }
309 }
310 Ok(())
311}
312
313#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
315#[serde(rename_all = "camelCase")]
316pub struct StorageSpec {
317 #[serde(default = "default_storage_size")]
319 #[validate(custom(function = "validate_quantity"))]
320 pub size: String,
321
322 #[serde(default)]
324 #[validate(custom(function = "validate_optional_k8s_name"))]
325 pub storage_class_name: Option<String>,
326
327 #[serde(default = "default_access_modes")]
329 #[validate(length(min = 1, max = 3, message = "access modes must have 1-3 entries"))]
330 #[validate(custom(function = "validate_access_modes"))]
331 pub access_modes: Vec<String>,
332}
333
334fn validate_access_modes(modes: &[String]) -> Result<(), ValidationError> {
336 let valid_modes = [
337 "ReadWriteOnce",
338 "ReadOnlyMany",
339 "ReadWriteMany",
340 "ReadWriteOncePod",
341 ];
342 for mode in modes {
343 if !valid_modes.contains(&mode.as_str()) {
344 return Err(ValidationError::new("invalid_access_mode")
345 .with_message(format!("'{}' is not a valid access mode", mode).into()));
346 }
347 }
348 Ok(())
349}
350
351impl Default for StorageSpec {
352 fn default() -> Self {
353 Self {
354 size: default_storage_size(),
355 storage_class_name: None,
356 access_modes: default_access_modes(),
357 }
358 }
359}
360
361#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
363#[serde(rename_all = "camelCase")]
364pub struct BrokerConfig {
365 #[serde(default = "default_partitions")]
367 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
368 pub default_partitions: i32,
369
370 #[serde(default = "default_replication_factor")]
372 #[validate(range(
373 min = 1,
374 max = 10,
375 message = "replication factor must be between 1 and 10"
376 ))]
377 pub default_replication_factor: i32,
378
379 #[serde(default = "default_log_retention_hours")]
381 #[validate(range(
382 min = 1,
383 max = 8760,
384 message = "retention hours must be between 1 and 8760"
385 ))]
386 pub log_retention_hours: i32,
387
388 #[serde(default = "default_log_segment_bytes")]
390 #[validate(custom(function = "validate_segment_size"))]
391 pub log_segment_bytes: i64,
392
393 #[serde(default = "default_max_message_bytes")]
395 #[validate(custom(function = "validate_message_size"))]
396 pub max_message_bytes: i64,
397
398 #[serde(default = "default_true")]
400 pub auto_create_topics: bool,
401
402 #[serde(default = "default_true")]
404 pub compression_enabled: bool,
405
406 #[serde(default = "default_compression")]
408 #[validate(custom(function = "validate_compression_type"))]
409 pub compression_type: String,
410
411 #[serde(default = "default_election_timeout")]
413 #[validate(range(
414 min = 100,
415 max = 60000,
416 message = "election timeout must be between 100ms and 60s"
417 ))]
418 pub raft_election_timeout_ms: i32,
419
420 #[serde(default = "default_heartbeat_interval")]
422 #[validate(range(
423 min = 10,
424 max = 10000,
425 message = "heartbeat interval must be between 10ms and 10s"
426 ))]
427 pub raft_heartbeat_interval_ms: i32,
428
429 #[serde(default)]
431 #[validate(custom(function = "validate_raw_config"))]
432 pub raw: BTreeMap<String, String>,
433}
434
435fn validate_compression_type(compression: &str) -> Result<(), ValidationError> {
437 match compression {
438 "lz4" | "zstd" | "none" | "snappy" | "gzip" => Ok(()),
439 _ => Err(ValidationError::new("invalid_compression")
440 .with_message("compression must be one of: lz4, zstd, none, snappy, gzip".into())),
441 }
442}
443
444fn validate_segment_size(size: i64) -> Result<(), ValidationError> {
446 const MIN_SEGMENT_SIZE: i64 = 1_048_576; const MAX_SEGMENT_SIZE: i64 = 10_737_418_240; if !(MIN_SEGMENT_SIZE..=MAX_SEGMENT_SIZE).contains(&size) {
449 return Err(ValidationError::new("invalid_segment_size")
450 .with_message("segment size must be between 1MB and 10GB".into()));
451 }
452 Ok(())
453}
454
455fn validate_message_size(size: i64) -> Result<(), ValidationError> {
457 const MIN_MESSAGE_SIZE: i64 = 1_024; const MAX_MESSAGE_SIZE: i64 = 104_857_600; if !(MIN_MESSAGE_SIZE..=MAX_MESSAGE_SIZE).contains(&size) {
460 return Err(ValidationError::new("invalid_message_size")
461 .with_message("max message size must be between 1KB and 100MB".into()));
462 }
463 Ok(())
464}
465
466fn validate_raw_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
468 if config.len() > 50 {
469 return Err(ValidationError::new("too_many_raw_configs")
470 .with_message("maximum 50 raw configuration entries allowed".into()));
471 }
472 for (key, value) in config {
473 if key.len() > 128 || value.len() > 4096 {
474 return Err(ValidationError::new("raw_config_too_long")
475 .with_message("raw config key max 128 chars, value max 4096 chars".into()));
476 }
477 let forbidden_keys = ["command", "args", "image", "securityContext", "volumes"];
479 if forbidden_keys.contains(&key.as_str()) {
480 return Err(ValidationError::new("forbidden_raw_config")
481 .with_message(format!("raw config key '{}' is not allowed", key).into()));
482 }
483 }
484 Ok(())
485}
486
487impl Default for BrokerConfig {
488 fn default() -> Self {
489 Self {
490 default_partitions: default_partitions(),
491 default_replication_factor: default_replication_factor(),
492 log_retention_hours: default_log_retention_hours(),
493 log_segment_bytes: default_log_segment_bytes(),
494 max_message_bytes: default_max_message_bytes(),
495 auto_create_topics: default_true(),
496 compression_enabled: default_true(),
497 compression_type: default_compression(),
498 raft_election_timeout_ms: default_election_timeout(),
499 raft_heartbeat_interval_ms: default_heartbeat_interval(),
500 raw: BTreeMap::new(),
501 }
502 }
503}
504
505#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
507#[serde(rename_all = "camelCase")]
508pub struct TlsSpec {
509 #[serde(default)]
511 pub enabled: bool,
512
513 #[serde(default)]
515 #[validate(custom(function = "validate_optional_k8s_name"))]
516 pub cert_secret_name: Option<String>,
517
518 #[serde(default)]
520 pub mtls_enabled: bool,
521
522 #[serde(default)]
524 #[validate(custom(function = "validate_optional_k8s_name"))]
525 pub ca_secret_name: Option<String>,
526}
527
528#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
530#[serde(rename_all = "camelCase")]
531pub struct MetricsSpec {
532 #[serde(default = "default_true")]
534 pub enabled: bool,
535
536 #[serde(default = "default_metrics_port")]
538 #[validate(range(
539 min = 1024,
540 max = 65535,
541 message = "metrics port must be between 1024 and 65535"
542 ))]
543 pub port: i32,
544
545 #[serde(default)]
547 #[validate(nested)]
548 pub service_monitor: ServiceMonitorSpec,
549}
550
551impl Default for MetricsSpec {
552 fn default() -> Self {
553 Self {
554 enabled: true,
555 port: 9090,
556 service_monitor: ServiceMonitorSpec::default(),
557 }
558 }
559}
560
561#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
563#[serde(rename_all = "camelCase")]
564pub struct ServiceMonitorSpec {
565 #[serde(default)]
567 pub enabled: bool,
568
569 #[serde(default)]
571 #[validate(custom(function = "validate_optional_k8s_name"))]
572 pub namespace: Option<String>,
573
574 #[serde(default = "default_scrape_interval")]
576 #[validate(custom(function = "validate_duration"))]
577 pub interval: String,
578
579 #[serde(default)]
581 #[validate(custom(function = "validate_service_monitor_labels"))]
582 pub labels: BTreeMap<String, String>,
583}
584
585fn validate_duration(duration: &str) -> Result<(), ValidationError> {
587 static DURATION_REGEX: LazyLock<Regex> =
588 LazyLock::new(|| Regex::new(r"^([0-9]+[smh])+$").unwrap());
589 if !DURATION_REGEX.is_match(duration) {
590 return Err(ValidationError::new("invalid_duration").with_message(
591 format!("'{}' is not a valid duration (e.g., 30s, 1m)", duration).into(),
592 ));
593 }
594 Ok(())
595}
596
597fn validate_service_monitor_labels(
599 labels: &BTreeMap<String, String>,
600) -> Result<(), ValidationError> {
601 if labels.len() > 10 {
602 return Err(ValidationError::new("too_many_labels")
603 .with_message("maximum 10 ServiceMonitor labels allowed".into()));
604 }
605 Ok(())
606}
607
608#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
610#[serde(rename_all = "camelCase")]
611pub struct PdbSpec {
612 #[serde(default = "default_true")]
614 pub enabled: bool,
615
616 #[serde(default)]
619 #[validate(custom(function = "validate_optional_int_or_percent"))]
620 pub min_available: Option<String>,
621
622 #[serde(default = "default_max_unavailable")]
625 #[validate(custom(function = "validate_optional_int_or_percent"))]
626 pub max_unavailable: Option<String>,
627}
628
629fn validate_optional_int_or_percent(value: &str) -> Result<(), ValidationError> {
631 if value.is_empty() {
632 return Ok(());
633 }
634 static INT_OR_PERCENT_REGEX: LazyLock<Regex> =
636 LazyLock::new(|| Regex::new(r"^([0-9]+|[0-9]+%)$").unwrap());
637 if !INT_OR_PERCENT_REGEX.is_match(value) {
638 return Err(ValidationError::new("invalid_int_or_percent").with_message(
639 format!(
640 "'{}' must be an integer or percentage (e.g., '1' or '25%')",
641 value
642 )
643 .into(),
644 ));
645 }
646 Ok(())
647}
648
649impl Default for PdbSpec {
650 fn default() -> Self {
651 Self {
652 enabled: true,
653 min_available: None,
654 max_unavailable: Some("1".to_string()),
655 }
656 }
657}
658
659#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
661#[serde(rename_all = "camelCase")]
662pub struct ProbeSpec {
663 #[serde(default = "default_true")]
665 pub enabled: bool,
666
667 #[serde(default = "default_initial_delay")]
669 #[validate(range(min = 0, max = 3600, message = "initial delay must be 0-3600 seconds"))]
670 pub initial_delay_seconds: i32,
671
672 #[serde(default = "default_period")]
674 #[validate(range(min = 1, max = 300, message = "period must be 1-300 seconds"))]
675 pub period_seconds: i32,
676
677 #[serde(default = "default_timeout")]
679 #[validate(range(min = 1, max = 60, message = "timeout must be 1-60 seconds"))]
680 pub timeout_seconds: i32,
681
682 #[serde(default = "default_one")]
684 #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
685 pub success_threshold: i32,
686
687 #[serde(default = "default_three")]
689 #[validate(range(min = 1, max = 30, message = "failure threshold must be 1-30"))]
690 pub failure_threshold: i32,
691}
692
693impl Default for ProbeSpec {
694 fn default() -> Self {
695 Self {
696 enabled: true,
697 initial_delay_seconds: 30,
698 period_seconds: 10,
699 timeout_seconds: 5,
700 success_threshold: 1,
701 failure_threshold: 3,
702 }
703 }
704}
705
706#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
708#[serde(rename_all = "camelCase")]
709pub struct RivvenClusterStatus {
710 pub phase: ClusterPhase,
712
713 pub replicas: i32,
715
716 pub ready_replicas: i32,
718
719 pub updated_replicas: i32,
721
722 pub observed_generation: i64,
724
725 #[serde(default)]
727 pub conditions: Vec<ClusterCondition>,
728
729 #[serde(default)]
731 pub broker_endpoints: Vec<String>,
732
733 pub leader: Option<String>,
735
736 pub last_updated: Option<String>,
738
739 pub message: Option<String>,
741}
742
743#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
745pub enum ClusterPhase {
746 #[default]
748 Pending,
749 Provisioning,
751 Running,
753 Updating,
755 Degraded,
757 Failed,
759 Terminating,
761}
762
763#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
765#[serde(rename_all = "camelCase")]
766pub struct ClusterCondition {
767 #[serde(rename = "type")]
769 pub condition_type: String,
770
771 pub status: String,
773
774 pub reason: Option<String>,
776
777 pub message: Option<String>,
779
780 pub last_transition_time: Option<String>,
782}
783
784fn default_replicas() -> i32 {
786 3
787}
788
789fn default_version() -> String {
790 "0.0.1".to_string()
791}
792
793fn default_image_pull_policy() -> String {
794 "IfNotPresent".to_string()
795}
796
797fn default_storage_size() -> String {
798 "10Gi".to_string()
799}
800
801fn default_access_modes() -> Vec<String> {
802 vec!["ReadWriteOnce".to_string()]
803}
804
805fn default_partitions() -> i32 {
806 3
807}
808
809fn default_replication_factor() -> i32 {
810 2
811}
812
813fn default_log_retention_hours() -> i32 {
814 168 }
816
817fn default_log_segment_bytes() -> i64 {
818 1073741824 }
820
821fn default_max_message_bytes() -> i64 {
822 1048576 }
824
825fn default_compression() -> String {
826 "lz4".to_string()
827}
828
829fn default_election_timeout() -> i32 {
830 1000
831}
832
833fn default_heartbeat_interval() -> i32 {
834 100
835}
836
837fn default_metrics_port() -> i32 {
838 9090
839}
840
841fn default_scrape_interval() -> String {
842 "30s".to_string()
843}
844
845fn default_max_unavailable() -> Option<String> {
846 Some("1".to_string())
847}
848
849fn default_initial_delay() -> i32 {
850 30
851}
852
853fn default_period() -> i32 {
854 10
855}
856
857fn default_timeout() -> i32 {
858 5
859}
860
861fn default_one() -> i32 {
862 1
863}
864
865fn default_three() -> i32 {
866 3
867}
868
869fn default_true() -> bool {
870 true
871}
872
873impl RivvenClusterSpec {
874 pub fn get_image(&self) -> String {
876 if let Some(ref image) = self.image {
877 image.clone()
878 } else {
879 format!("ghcr.io/hupe1980/rivven:{}", self.version)
880 }
881 }
882
883 pub fn get_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
885 let mut labels = BTreeMap::new();
886 labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
887 labels.insert(
888 "app.kubernetes.io/instance".to_string(),
889 cluster_name.to_string(),
890 );
891 labels.insert(
892 "app.kubernetes.io/component".to_string(),
893 "broker".to_string(),
894 );
895 labels.insert(
896 "app.kubernetes.io/managed-by".to_string(),
897 "rivven-operator".to_string(),
898 );
899 labels.insert(
900 "app.kubernetes.io/version".to_string(),
901 self.version.clone(),
902 );
903 labels
904 }
905
906 pub fn get_selector_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
908 let mut labels = BTreeMap::new();
909 labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
910 labels.insert(
911 "app.kubernetes.io/instance".to_string(),
912 cluster_name.to_string(),
913 );
914 labels
915 }
916}
917
918#[allow(dead_code)]
931#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
932#[kube(
933 group = "rivven.hupe1980.github.io",
934 version = "v1alpha1",
935 kind = "RivvenConnect",
936 plural = "rivvenconnects",
937 shortname = "rc",
938 namespaced,
939 status = "RivvenConnectStatus",
940 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
941 printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
942 printcolumn = r#"{"name":"Sources","type":"integer","jsonPath":".status.sourcesRunning"}"#,
943 printcolumn = r#"{"name":"Sinks","type":"integer","jsonPath":".status.sinksRunning"}"#,
944 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
945 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
946)]
947#[serde(rename_all = "camelCase")]
948pub struct RivvenConnectSpec {
949 #[validate(nested)]
951 pub cluster_ref: ClusterReference,
952
953 #[serde(default = "default_connect_replicas")]
955 #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
956 pub replicas: i32,
957
958 #[serde(default = "default_version")]
960 pub version: String,
961
962 #[serde(default)]
964 #[validate(custom(function = "validate_optional_image"))]
965 pub image: Option<String>,
966
967 #[serde(default = "default_image_pull_policy")]
969 #[validate(custom(function = "validate_pull_policy"))]
970 pub image_pull_policy: String,
971
972 #[serde(default)]
974 pub image_pull_secrets: Vec<String>,
975
976 #[serde(default)]
978 pub resources: Option<serde_json::Value>,
979
980 #[serde(default)]
982 #[validate(nested)]
983 pub config: ConnectConfigSpec,
984
985 #[serde(default)]
987 #[validate(length(max = 50, message = "maximum 50 source connectors allowed"))]
988 pub sources: Vec<SourceConnectorSpec>,
989
990 #[serde(default)]
992 #[validate(length(max = 50, message = "maximum 50 sink connectors allowed"))]
993 pub sinks: Vec<SinkConnectorSpec>,
994
995 #[serde(default)]
997 #[validate(nested)]
998 pub settings: GlobalConnectSettings,
999
1000 #[serde(default)]
1002 #[validate(nested)]
1003 pub tls: ConnectTlsSpec,
1004
1005 #[serde(default)]
1007 #[validate(custom(function = "validate_annotations"))]
1008 pub pod_annotations: BTreeMap<String, String>,
1009
1010 #[serde(default)]
1012 #[validate(custom(function = "validate_labels"))]
1013 pub pod_labels: BTreeMap<String, String>,
1014
1015 #[serde(default)]
1017 #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
1018 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
1019
1020 #[serde(default)]
1022 pub node_selector: BTreeMap<String, String>,
1023
1024 #[serde(default)]
1026 #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
1027 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
1028
1029 #[serde(default)]
1031 pub affinity: Option<serde_json::Value>,
1032
1033 #[serde(default)]
1035 #[validate(custom(function = "validate_optional_k8s_name"))]
1036 pub service_account: Option<String>,
1037
1038 #[serde(default)]
1040 pub security_context: Option<serde_json::Value>,
1041
1042 #[serde(default)]
1044 pub container_security_context: Option<serde_json::Value>,
1045}
1046
1047#[allow(dead_code)]
1049#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1050#[serde(rename_all = "camelCase")]
1051pub struct ClusterReference {
1052 #[validate(length(min = 1, max = 63, message = "cluster name must be 1-63 characters"))]
1054 #[validate(custom(function = "validate_k8s_name"))]
1055 pub name: String,
1056
1057 #[serde(default)]
1059 #[validate(custom(function = "validate_optional_k8s_name"))]
1060 pub namespace: Option<String>,
1061}
1062
1063#[allow(dead_code)]
1098#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1099#[kube(
1100 group = "rivven.hupe1980.github.io",
1101 version = "v1alpha1",
1102 kind = "RivvenTopic",
1103 plural = "rivventopics",
1104 shortname = "rt",
1105 namespaced,
1106 status = "RivvenTopicStatus",
1107 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1108 printcolumn = r#"{"name":"Partitions","type":"integer","jsonPath":".spec.partitions"}"#,
1109 printcolumn = r#"{"name":"Replication","type":"integer","jsonPath":".spec.replicationFactor"}"#,
1110 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1111 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1112)]
1113#[serde(rename_all = "camelCase")]
1114pub struct RivvenTopicSpec {
1115 #[validate(nested)]
1117 pub cluster_ref: ClusterReference,
1118
1119 #[serde(default = "default_rivven_topic_partitions")]
1122 #[validate(range(
1123 min = 1,
1124 max = 10000,
1125 message = "partitions must be between 1 and 10000"
1126 ))]
1127 pub partitions: i32,
1128
1129 #[serde(default = "default_rivven_topic_replication")]
1132 #[validate(range(
1133 min = 1,
1134 max = 10,
1135 message = "replication factor must be between 1 and 10"
1136 ))]
1137 pub replication_factor: i32,
1138
1139 #[serde(default)]
1141 #[validate(nested)]
1142 pub config: TopicConfig,
1143
1144 #[serde(default)]
1146 #[validate(length(max = 100, message = "maximum 100 ACL entries allowed"))]
1147 #[validate(custom(function = "validate_topic_acls"))]
1148 pub acls: Vec<TopicAcl>,
1149
1150 #[serde(default = "default_true")]
1153 pub delete_on_remove: bool,
1154
1155 #[serde(default)]
1157 #[validate(custom(function = "validate_labels"))]
1158 pub topic_labels: BTreeMap<String, String>,
1159}
1160
1161#[allow(dead_code)]
1162fn default_rivven_topic_partitions() -> i32 {
1163 3
1164}
1165
1166#[allow(dead_code)]
1167fn default_rivven_topic_replication() -> i32 {
1168 1
1169}
1170
1171#[allow(dead_code)]
1173#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1174#[serde(rename_all = "camelCase")]
1175pub struct TopicConfig {
1176 #[serde(default = "default_topic_retention_ms")]
1179 #[validate(range(
1180 min = 3600000,
1181 max = 315360000000_i64,
1182 message = "retention must be between 1 hour and 10 years"
1183 ))]
1184 pub retention_ms: i64,
1185
1186 #[serde(default = "default_topic_retention_bytes")]
1189 #[validate(custom(function = "validate_topic_retention_bytes"))]
1190 pub retention_bytes: i64,
1191
1192 #[serde(default = "default_topic_segment_bytes")]
1194 #[validate(custom(function = "validate_segment_size"))]
1195 pub segment_bytes: i64,
1196
1197 #[serde(default = "default_topic_cleanup_policy")]
1199 #[validate(custom(function = "validate_topic_cleanup_policy"))]
1200 pub cleanup_policy: String,
1201
1202 #[serde(default = "default_topic_compression")]
1204 #[validate(custom(function = "validate_topic_compression"))]
1205 pub compression_type: String,
1206
1207 #[serde(default = "default_topic_min_isr")]
1209 #[validate(range(min = 1, max = 10, message = "min ISR must be between 1 and 10"))]
1210 pub min_insync_replicas: i32,
1211
1212 #[serde(default = "default_max_message_bytes")]
1214 #[validate(custom(function = "validate_message_size"))]
1215 pub max_message_bytes: i64,
1216
1217 #[serde(default = "default_true")]
1219 pub message_timestamp_enabled: bool,
1220
1221 #[serde(default = "default_topic_timestamp_type")]
1223 #[validate(custom(function = "validate_topic_timestamp_type"))]
1224 pub message_timestamp_type: String,
1225
1226 #[serde(default = "default_true")]
1228 pub idempotent_writes: bool,
1229
1230 #[serde(default)]
1232 #[validate(range(min = 0, max = 86400000, message = "flush interval must be 0-24 hours"))]
1233 pub flush_interval_ms: i64,
1234
1235 #[serde(default)]
1237 #[validate(custom(function = "validate_topic_custom_config"))]
1238 pub custom: BTreeMap<String, String>,
1239}
1240
1241#[allow(dead_code)]
1242fn default_topic_retention_ms() -> i64 {
1243 604800000 }
1245
1246#[allow(dead_code)]
1247fn default_topic_retention_bytes() -> i64 {
1248 -1 }
1250
1251#[allow(dead_code)]
1252fn default_topic_segment_bytes() -> i64 {
1253 1073741824 }
1255
1256#[allow(dead_code)]
1257fn default_topic_cleanup_policy() -> String {
1258 "delete".to_string()
1259}
1260
1261#[allow(dead_code)]
1262fn default_topic_compression() -> String {
1263 "lz4".to_string()
1264}
1265
1266#[allow(dead_code)]
1267fn default_topic_min_isr() -> i32 {
1268 1
1269}
1270
1271#[allow(dead_code)]
1272fn default_topic_timestamp_type() -> String {
1273 "CreateTime".to_string()
1274}
1275
1276#[allow(dead_code)]
1277fn validate_topic_retention_bytes(value: i64) -> Result<(), ValidationError> {
1278 if value == -1 || (1048576..=10995116277760).contains(&value) {
1279 Ok(())
1280 } else {
1281 Err(ValidationError::new("invalid_retention_bytes")
1282 .with_message("retention_bytes must be -1 (unlimited) or 1MB-10TB".into()))
1283 }
1284}
1285
1286#[allow(dead_code)]
1287fn validate_topic_cleanup_policy(policy: &str) -> Result<(), ValidationError> {
1288 match policy {
1289 "delete" | "compact" | "delete,compact" | "compact,delete" => Ok(()),
1290 _ => Err(ValidationError::new("invalid_cleanup_policy").with_message(
1291 "cleanup_policy must be 'delete', 'compact', or 'delete,compact'".into(),
1292 )),
1293 }
1294}
1295
1296#[allow(dead_code)]
1297fn validate_topic_compression(compression: &str) -> Result<(), ValidationError> {
1298 match compression {
1299 "none" | "gzip" | "snappy" | "lz4" | "zstd" | "producer" => Ok(()),
1300 _ => Err(ValidationError::new("invalid_compression")
1301 .with_message("compression must be none, gzip, snappy, lz4, zstd, or producer".into())),
1302 }
1303}
1304
1305#[allow(dead_code)]
1306fn validate_topic_timestamp_type(ts_type: &str) -> Result<(), ValidationError> {
1307 match ts_type {
1308 "CreateTime" | "LogAppendTime" => Ok(()),
1309 _ => Err(ValidationError::new("invalid_timestamp_type")
1310 .with_message("timestamp type must be 'CreateTime' or 'LogAppendTime'".into())),
1311 }
1312}
1313
1314#[allow(dead_code)]
1315fn validate_topic_custom_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
1316 if config.len() > 50 {
1317 return Err(ValidationError::new("too_many_custom_configs")
1318 .with_message("maximum 50 custom config entries allowed".into()));
1319 }
1320 for (key, value) in config {
1321 if key.len() > 128 || value.len() > 4096 {
1322 return Err(ValidationError::new("config_too_long")
1323 .with_message("config key max 128 chars, value max 4096 chars".into()));
1324 }
1325 let protected = [
1327 "retention.ms",
1328 "retention.bytes",
1329 "segment.bytes",
1330 "cleanup.policy",
1331 ];
1332 if protected.contains(&key.as_str()) {
1333 return Err(ValidationError::new("protected_config").with_message(
1334 format!(
1335 "'{}' must be set via dedicated field, not custom config",
1336 key
1337 )
1338 .into(),
1339 ));
1340 }
1341 }
1342 Ok(())
1343}
1344
1345#[allow(dead_code)]
1347#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1348#[serde(rename_all = "camelCase")]
1349pub struct TopicAcl {
1350 #[validate(length(min = 1, max = 256, message = "principal must be 1-256 characters"))]
1352 #[validate(custom(function = "validate_principal"))]
1353 pub principal: String,
1354
1355 #[validate(length(min = 1, max = 7, message = "must specify 1-7 operations"))]
1357 #[validate(custom(function = "validate_operations"))]
1358 pub operations: Vec<String>,
1359
1360 #[serde(default = "default_permission_type")]
1362 #[validate(custom(function = "validate_permission_type"))]
1363 pub permission_type: String,
1364
1365 #[serde(default = "default_acl_host")]
1367 #[validate(length(max = 256, message = "host must be max 256 characters"))]
1368 pub host: String,
1369}
1370
1371#[allow(dead_code)]
1372fn default_permission_type() -> String {
1373 "Allow".to_string()
1374}
1375
1376#[allow(dead_code)]
1377fn default_acl_host() -> String {
1378 "*".to_string()
1379}
1380
1381#[allow(dead_code)]
1382fn validate_principal(principal: &str) -> Result<(), ValidationError> {
1383 if principal == "*" {
1384 return Ok(());
1385 }
1386 if let Some((prefix, name)) = principal.split_once(':') {
1387 if !["user", "group", "User", "Group"].contains(&prefix) {
1388 return Err(ValidationError::new("invalid_principal_prefix")
1389 .with_message("principal prefix must be 'user:' or 'group:'".into()));
1390 }
1391 if name.is_empty() || name.len() > 128 {
1392 return Err(ValidationError::new("invalid_principal_name")
1393 .with_message("principal name must be 1-128 characters".into()));
1394 }
1395 Ok(())
1396 } else {
1397 Err(ValidationError::new("invalid_principal_format")
1398 .with_message("principal must be '*' or 'user:name' or 'group:name'".into()))
1399 }
1400}
1401
1402fn validate_operations(ops: &[String]) -> Result<(), ValidationError> {
1403 let valid_ops = [
1404 "Read",
1405 "Write",
1406 "Create",
1407 "Delete",
1408 "Alter",
1409 "Describe",
1410 "All",
1411 "DescribeConfigs",
1412 "AlterConfigs",
1413 ];
1414 for op in ops {
1415 if !valid_ops.contains(&op.as_str()) {
1416 return Err(ValidationError::new("invalid_operation").with_message(
1417 format!("'{}' is not a valid operation. Valid: {:?}", op, valid_ops).into(),
1418 ));
1419 }
1420 }
1421 Ok(())
1422}
1423
1424#[allow(dead_code)]
1425fn validate_permission_type(perm: &str) -> Result<(), ValidationError> {
1426 match perm {
1427 "Allow" | "Deny" => Ok(()),
1428 _ => Err(ValidationError::new("invalid_permission_type")
1429 .with_message("permission_type must be 'Allow' or 'Deny'".into())),
1430 }
1431}
1432
1433#[allow(dead_code)]
1434fn validate_topic_acls(acls: &[TopicAcl]) -> Result<(), ValidationError> {
1435 let mut seen = std::collections::HashSet::new();
1437 for acl in acls {
1438 for op in &acl.operations {
1439 let key = format!("{}:{}", acl.principal, op);
1440 if !seen.insert(key.clone()) {
1441 return Err(ValidationError::new("duplicate_acl")
1442 .with_message(format!("duplicate ACL entry for {}", key).into()));
1443 }
1444 }
1445 }
1446 Ok(())
1447}
1448
1449#[allow(dead_code)]
1451#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1452#[serde(rename_all = "camelCase")]
1453pub struct RivvenTopicStatus {
1454 #[serde(default)]
1456 pub phase: String,
1457
1458 #[serde(default)]
1460 pub message: String,
1461
1462 #[serde(default)]
1464 pub current_partitions: i32,
1465
1466 #[serde(default)]
1468 pub current_replication_factor: i32,
1469
1470 #[serde(default)]
1472 pub topic_exists: bool,
1473
1474 #[serde(default)]
1476 pub observed_generation: i64,
1477
1478 #[serde(default)]
1480 pub conditions: Vec<TopicCondition>,
1481
1482 #[serde(default)]
1484 pub last_sync_time: Option<String>,
1485
1486 #[serde(default)]
1488 pub partition_info: Vec<PartitionInfo>,
1489}
1490
1491#[allow(dead_code)]
1493#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1494#[serde(rename_all = "camelCase")]
1495pub struct TopicCondition {
1496 pub r#type: String,
1498
1499 pub status: String,
1501
1502 pub reason: String,
1504
1505 pub message: String,
1507
1508 pub last_transition_time: String,
1510}
1511
1512#[allow(dead_code)]
1514#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1515#[serde(rename_all = "camelCase")]
1516pub struct PartitionInfo {
1517 pub partition: i32,
1519
1520 pub leader: i32,
1522
1523 pub replicas: Vec<i32>,
1525
1526 pub isr: Vec<i32>,
1528}
1529
1530#[allow(dead_code)]
1532#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1533#[serde(rename_all = "camelCase")]
1534pub struct ConnectConfigSpec {
1535 #[serde(default = "default_state_dir")]
1537 pub state_dir: String,
1538
1539 #[serde(default = "default_log_level")]
1541 #[validate(custom(function = "validate_log_level"))]
1542 pub log_level: String,
1543}
1544
1545#[allow(dead_code)]
1546fn default_state_dir() -> String {
1547 "/data/connect-state".to_string()
1548}
1549
1550#[allow(dead_code)]
1551fn default_log_level() -> String {
1552 "info".to_string()
1553}
1554
1555#[allow(dead_code)]
1556fn validate_log_level(level: &str) -> Result<(), ValidationError> {
1557 match level {
1558 "trace" | "debug" | "info" | "warn" | "error" => Ok(()),
1559 _ => Err(ValidationError::new("invalid_log_level")
1560 .with_message("log level must be one of: trace, debug, info, warn, error".into())),
1561 }
1562}
1563
1564#[allow(dead_code)]
1570#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1571#[serde(rename_all = "camelCase")]
1572pub struct SourceConnectorSpec {
1573 #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1575 #[validate(custom(function = "validate_k8s_name"))]
1576 pub name: String,
1577
1578 #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1580 #[validate(custom(function = "validate_connector_type"))]
1581 pub connector: String,
1582
1583 #[validate(length(min = 1, max = 255, message = "topic must be 1-255 characters"))]
1585 pub topic: String,
1586
1587 #[serde(default)]
1592 pub topic_routing: Option<String>,
1593
1594 #[serde(default = "default_true")]
1596 pub enabled: bool,
1597
1598 #[serde(default)]
1602 pub config: serde_json::Value,
1603
1604 #[serde(default)]
1607 #[validate(custom(function = "validate_optional_k8s_name"))]
1608 pub config_secret_ref: Option<String>,
1609
1610 #[serde(default)]
1612 #[validate(nested)]
1613 pub topic_config: SourceTopicConfigSpec,
1614}
1615
1616#[allow(dead_code)]
1617fn validate_connector_type(connector: &str) -> Result<(), ValidationError> {
1618 static CONNECTOR_REGEX: LazyLock<Regex> =
1620 LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
1621 if !CONNECTOR_REGEX.is_match(connector) {
1622 return Err(ValidationError::new("invalid_connector_type")
1623 .with_message("connector type must be lowercase alphanumeric with hyphens".into()));
1624 }
1625 Ok(())
1626}
1627
1628#[allow(dead_code)]
1630#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1631#[serde(rename_all = "camelCase")]
1632pub struct TableSpec {
1633 #[serde(default)]
1635 pub schema: Option<String>,
1636
1637 #[validate(length(min = 1, max = 128, message = "table name must be 1-128 characters"))]
1639 pub table: String,
1640
1641 #[serde(default)]
1643 pub topic: Option<String>,
1644
1645 #[serde(default)]
1647 #[validate(length(max = 500, message = "maximum 500 columns per table"))]
1648 pub columns: Vec<String>,
1649
1650 #[serde(default)]
1652 #[validate(length(max = 500, message = "maximum 500 excluded columns per table"))]
1653 pub exclude_columns: Vec<String>,
1654
1655 #[serde(default)]
1657 pub column_masks: std::collections::BTreeMap<String, String>,
1658}
1659
1660#[allow(dead_code)]
1662#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1663#[serde(rename_all = "camelCase")]
1664pub struct SourceTopicConfigSpec {
1665 #[serde(default)]
1667 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1668 pub partitions: Option<i32>,
1669
1670 #[serde(default)]
1672 #[validate(range(
1673 min = 1,
1674 max = 10,
1675 message = "replication factor must be between 1 and 10"
1676 ))]
1677 pub replication_factor: Option<i32>,
1678
1679 #[serde(default)]
1681 pub auto_create: Option<bool>,
1682}
1683
1684#[allow(dead_code)]
1690#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1691#[serde(rename_all = "camelCase")]
1692pub struct SinkConnectorSpec {
1693 #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1695 #[validate(custom(function = "validate_k8s_name"))]
1696 pub name: String,
1697
1698 #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1700 #[validate(custom(function = "validate_connector_type"))]
1701 pub connector: String,
1702
1703 #[validate(length(min = 1, max = 100, message = "must have 1-100 topics"))]
1705 pub topics: Vec<String>,
1706
1707 #[validate(length(
1709 min = 1,
1710 max = 128,
1711 message = "consumer group must be 1-128 characters"
1712 ))]
1713 pub consumer_group: String,
1714
1715 #[serde(default = "default_true")]
1717 pub enabled: bool,
1718
1719 #[serde(default = "default_start_offset")]
1721 #[validate(custom(function = "validate_start_offset"))]
1722 pub start_offset: String,
1723
1724 #[serde(default)]
1728 pub config: serde_json::Value,
1729
1730 #[serde(default)]
1733 #[validate(custom(function = "validate_optional_k8s_name"))]
1734 pub config_secret_ref: Option<String>,
1735
1736 #[serde(default)]
1738 #[validate(nested)]
1739 pub rate_limit: RateLimitSpec,
1740}
1741
1742#[allow(dead_code)]
1743fn default_start_offset() -> String {
1744 "latest".to_string()
1745}
1746
1747#[allow(dead_code)]
1748fn validate_start_offset(offset: &str) -> Result<(), ValidationError> {
1749 match offset {
1750 "earliest" | "latest" => Ok(()),
1751 s if s.contains('T') && s.contains(':') => Ok(()), _ => Err(ValidationError::new("invalid_start_offset").with_message(
1753 "start offset must be 'earliest', 'latest', or ISO 8601 timestamp".into(),
1754 )),
1755 }
1756}
1757
1758#[allow(dead_code)]
1760#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1761#[serde(rename_all = "camelCase")]
1762pub struct RateLimitSpec {
1763 #[serde(default)]
1765 #[validate(range(
1766 min = 0,
1767 max = 1_000_000,
1768 message = "events per second must be 0-1000000"
1769 ))]
1770 pub events_per_second: u64,
1771
1772 #[serde(default)]
1774 #[validate(range(min = 0, max = 100_000, message = "burst capacity must be 0-100000"))]
1775 pub burst_capacity: Option<u64>,
1776}
1777
1778#[allow(dead_code)]
1780#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1781#[serde(rename_all = "camelCase")]
1782pub struct GlobalConnectSettings {
1783 #[serde(default)]
1785 #[validate(nested)]
1786 pub topic: TopicSettingsSpec,
1787
1788 #[serde(default)]
1790 #[validate(nested)]
1791 pub retry: RetryConfigSpec,
1792
1793 #[serde(default)]
1795 #[validate(nested)]
1796 pub health: HealthConfigSpec,
1797
1798 #[serde(default)]
1800 #[validate(nested)]
1801 pub metrics: ConnectMetricsSpec,
1802}
1803
1804#[allow(dead_code)]
1806#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1807#[serde(rename_all = "camelCase")]
1808pub struct TopicSettingsSpec {
1809 #[serde(default = "default_true")]
1811 pub auto_create: bool,
1812
1813 #[serde(default = "default_topic_partitions")]
1815 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1816 pub default_partitions: i32,
1817
1818 #[serde(default = "default_topic_replication")]
1820 #[validate(range(
1821 min = 1,
1822 max = 10,
1823 message = "replication factor must be between 1 and 10"
1824 ))]
1825 pub default_replication_factor: i32,
1826
1827 #[serde(default = "default_true")]
1829 pub require_topic_exists: bool,
1830}
1831
1832#[allow(dead_code)]
1833fn default_topic_partitions() -> i32 {
1834 1
1835}
1836
1837#[allow(dead_code)]
1838fn default_topic_replication() -> i32 {
1839 1
1840}
1841
1842impl Default for TopicSettingsSpec {
1843 fn default() -> Self {
1844 Self {
1845 auto_create: true,
1846 default_partitions: 1,
1847 default_replication_factor: 1,
1848 require_topic_exists: true,
1849 }
1850 }
1851}
1852
1853#[allow(dead_code)]
1855#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1856#[serde(rename_all = "camelCase")]
1857pub struct RetryConfigSpec {
1858 #[serde(default = "default_max_retries")]
1860 #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
1861 pub max_retries: i32,
1862
1863 #[serde(default = "default_initial_backoff_ms")]
1865 #[validate(range(min = 10, max = 60000, message = "initial backoff must be 10-60000ms"))]
1866 pub initial_backoff_ms: i64,
1867
1868 #[serde(default = "default_max_backoff_ms")]
1870 #[validate(range(
1871 min = 100,
1872 max = 3600000,
1873 message = "max backoff must be 100-3600000ms"
1874 ))]
1875 pub max_backoff_ms: i64,
1876
1877 #[serde(default = "default_backoff_multiplier")]
1879 pub backoff_multiplier: f64,
1880}
1881
1882#[allow(dead_code)]
1883fn default_max_retries() -> i32 {
1884 10
1885}
1886
1887#[allow(dead_code)]
1888fn default_initial_backoff_ms() -> i64 {
1889 100
1890}
1891
1892#[allow(dead_code)]
1893fn default_max_backoff_ms() -> i64 {
1894 30000
1895}
1896
1897#[allow(dead_code)]
1898fn default_backoff_multiplier() -> f64 {
1899 2.0
1900}
1901
1902impl Default for RetryConfigSpec {
1903 fn default() -> Self {
1904 Self {
1905 max_retries: 10,
1906 initial_backoff_ms: 100,
1907 max_backoff_ms: 30000,
1908 backoff_multiplier: 2.0,
1909 }
1910 }
1911}
1912
1913#[allow(dead_code)]
1915#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1916#[serde(rename_all = "camelCase")]
1917pub struct HealthConfigSpec {
1918 #[serde(default)]
1920 pub enabled: bool,
1921
1922 #[serde(default = "default_health_port")]
1924 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
1925 pub port: i32,
1926
1927 #[serde(default = "default_health_path")]
1929 pub path: String,
1930}
1931
1932#[allow(dead_code)]
1933fn default_health_port() -> i32 {
1934 8080
1935}
1936
1937#[allow(dead_code)]
1938fn default_health_path() -> String {
1939 "/health".to_string()
1940}
1941
1942impl Default for HealthConfigSpec {
1943 fn default() -> Self {
1944 Self {
1945 enabled: false,
1946 port: 8080,
1947 path: "/health".to_string(),
1948 }
1949 }
1950}
1951
1952#[allow(dead_code)]
1954#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1955#[serde(rename_all = "camelCase")]
1956pub struct ConnectMetricsSpec {
1957 #[serde(default)]
1959 pub enabled: bool,
1960
1961 #[serde(default = "default_connect_metrics_port")]
1963 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
1964 pub port: i32,
1965}
1966
1967#[allow(dead_code)]
1968fn default_connect_metrics_port() -> i32 {
1969 9091
1970}
1971
1972impl Default for ConnectMetricsSpec {
1973 fn default() -> Self {
1974 Self {
1975 enabled: false,
1976 port: 9091,
1977 }
1978 }
1979}
1980
1981#[allow(dead_code)]
1983#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1984#[serde(rename_all = "camelCase")]
1985pub struct ConnectTlsSpec {
1986 #[serde(default)]
1988 pub enabled: bool,
1989
1990 #[serde(default)]
1992 #[validate(custom(function = "validate_optional_k8s_name"))]
1993 pub cert_secret_name: Option<String>,
1994
1995 #[serde(default)]
1997 pub mtls_enabled: bool,
1998
1999 #[serde(default)]
2001 #[validate(custom(function = "validate_optional_k8s_name"))]
2002 pub ca_secret_name: Option<String>,
2003
2004 #[serde(default)]
2006 pub insecure: bool,
2007}
2008
2009#[allow(dead_code)]
2010fn default_connect_replicas() -> i32 {
2011 1
2012}
2013
2014#[allow(dead_code)]
2016#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
2017#[serde(rename_all = "camelCase")]
2018pub struct RivvenConnectStatus {
2019 pub phase: ConnectPhase,
2021
2022 pub replicas: i32,
2024
2025 pub ready_replicas: i32,
2027
2028 pub sources_running: i32,
2030
2031 pub sinks_running: i32,
2033
2034 pub sources_total: i32,
2036
2037 pub sinks_total: i32,
2039
2040 pub observed_generation: i64,
2042
2043 #[serde(default)]
2045 pub conditions: Vec<ConnectCondition>,
2046
2047 #[serde(default)]
2049 pub connector_statuses: Vec<ConnectorStatus>,
2050
2051 pub last_updated: Option<String>,
2053
2054 pub message: Option<String>,
2056}
2057
2058#[allow(dead_code)]
2060#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
2061pub enum ConnectPhase {
2062 #[default]
2064 Pending,
2065 Starting,
2067 Running,
2069 Degraded,
2071 Failed,
2073 Terminating,
2075}
2076
2077#[allow(dead_code)]
2079#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2080#[serde(rename_all = "camelCase")]
2081pub struct ConnectCondition {
2082 #[serde(rename = "type")]
2084 pub condition_type: String,
2085
2086 pub status: String,
2088
2089 pub reason: Option<String>,
2091
2092 pub message: Option<String>,
2094
2095 pub last_transition_time: Option<String>,
2097}
2098
2099#[allow(dead_code)]
2101#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2102#[serde(rename_all = "camelCase")]
2103pub struct ConnectorStatus {
2104 pub name: String,
2106
2107 pub connector_type: String,
2109
2110 pub kind: String,
2112
2113 pub state: String,
2115
2116 pub events_processed: i64,
2118
2119 pub last_error: Option<String>,
2121
2122 pub last_success_time: Option<String>,
2124}
2125
2126#[allow(dead_code)]
2127impl RivvenConnectSpec {
2128 pub fn get_image(&self) -> String {
2130 if let Some(ref image) = self.image {
2131 image.clone()
2132 } else {
2133 format!("ghcr.io/hupe1980/rivven-connect:{}", self.version)
2134 }
2135 }
2136
2137 pub fn get_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2139 let mut labels = BTreeMap::new();
2140 labels.insert(
2141 "app.kubernetes.io/name".to_string(),
2142 "rivven-connect".to_string(),
2143 );
2144 labels.insert(
2145 "app.kubernetes.io/instance".to_string(),
2146 connect_name.to_string(),
2147 );
2148 labels.insert(
2149 "app.kubernetes.io/component".to_string(),
2150 "connector".to_string(),
2151 );
2152 labels.insert(
2153 "app.kubernetes.io/managed-by".to_string(),
2154 "rivven-operator".to_string(),
2155 );
2156 labels.insert(
2157 "app.kubernetes.io/version".to_string(),
2158 self.version.clone(),
2159 );
2160 labels
2161 }
2162
2163 pub fn get_selector_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2165 let mut labels = BTreeMap::new();
2166 labels.insert(
2167 "app.kubernetes.io/name".to_string(),
2168 "rivven-connect".to_string(),
2169 );
2170 labels.insert(
2171 "app.kubernetes.io/instance".to_string(),
2172 connect_name.to_string(),
2173 );
2174 labels
2175 }
2176
2177 pub fn enabled_sources_count(&self) -> usize {
2179 self.sources.iter().filter(|s| s.enabled).count()
2180 }
2181
2182 pub fn enabled_sinks_count(&self) -> usize {
2184 self.sinks.iter().filter(|s| s.enabled).count()
2185 }
2186}
2187
2188#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2195#[serde(rename_all = "camelCase")]
2196pub struct SnapshotCdcConfigSpec {
2197 #[serde(default = "default_snapshot_batch_size")]
2199 #[validate(range(min = 100, max = 1000000, message = "batch size must be 100-1000000"))]
2200 pub batch_size: i32,
2201
2202 #[serde(default = "default_snapshot_parallel_tables")]
2204 #[validate(range(min = 1, max = 32, message = "parallel tables must be 1-32"))]
2205 pub parallel_tables: i32,
2206
2207 #[serde(default = "default_snapshot_query_timeout")]
2209 #[validate(range(min = 10, max = 3600, message = "query timeout must be 10-3600s"))]
2210 pub query_timeout_secs: i32,
2211
2212 #[serde(default)]
2214 #[validate(range(min = 0, max = 60000, message = "throttle delay must be 0-60000ms"))]
2215 pub throttle_delay_ms: i32,
2216
2217 #[serde(default = "default_snapshot_max_retries")]
2219 #[validate(range(min = 0, max = 10, message = "max retries must be 0-10"))]
2220 pub max_retries: i32,
2221
2222 #[serde(default)]
2224 pub include_tables: Vec<String>,
2225
2226 #[serde(default)]
2228 pub exclude_tables: Vec<String>,
2229}
2230
2231fn default_snapshot_batch_size() -> i32 {
2232 10_000
2233}
2234fn default_snapshot_parallel_tables() -> i32 {
2235 4
2236}
2237fn default_snapshot_query_timeout() -> i32 {
2238 300
2239}
2240fn default_snapshot_max_retries() -> i32 {
2241 3
2242}
2243
2244#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2246#[serde(rename_all = "camelCase")]
2247pub struct IncrementalSnapshotSpec {
2248 #[serde(default)]
2250 pub enabled: bool,
2251
2252 #[serde(default = "default_incremental_chunk_size")]
2254 #[validate(range(min = 100, max = 100000, message = "chunk size must be 100-100000"))]
2255 pub chunk_size: i32,
2256
2257 #[serde(default)]
2259 #[validate(custom(function = "validate_watermark_strategy"))]
2260 pub watermark_strategy: String,
2261
2262 #[serde(default)]
2264 pub watermark_signal_table: Option<String>,
2265
2266 #[serde(default = "default_incremental_max_chunks")]
2268 #[validate(range(min = 1, max = 16, message = "max concurrent chunks must be 1-16"))]
2269 pub max_concurrent_chunks: i32,
2270
2271 #[serde(default)]
2273 #[validate(range(min = 0, max = 60000, message = "chunk delay must be 0-60000ms"))]
2274 pub chunk_delay_ms: i32,
2275}
2276
2277fn default_incremental_chunk_size() -> i32 {
2278 1024
2279}
2280fn default_incremental_max_chunks() -> i32 {
2281 1
2282}
2283
2284fn validate_watermark_strategy(strategy: &str) -> Result<(), ValidationError> {
2285 match strategy {
2286 "" | "insert" | "update_and_insert" => Ok(()),
2287 _ => Err(ValidationError::new("invalid_watermark_strategy")
2288 .with_message("watermark strategy must be: insert or update_and_insert".into())),
2289 }
2290}
2291
2292#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2294#[serde(rename_all = "camelCase")]
2295pub struct SignalTableSpec {
2296 #[serde(default)]
2298 pub enabled: bool,
2299
2300 #[serde(default)]
2302 pub data_collection: Option<String>,
2303
2304 #[serde(default)]
2306 pub topic: Option<String>,
2307
2308 #[serde(default)]
2310 pub enabled_channels: Vec<String>,
2311
2312 #[serde(default = "default_signal_poll_interval")]
2314 pub poll_interval_ms: i32,
2315}
2316
2317fn default_signal_poll_interval() -> i32 {
2318 1000
2319}
2320
2321#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2323#[serde(rename_all = "camelCase")]
2324pub struct HeartbeatCdcSpec {
2325 #[serde(default)]
2327 pub enabled: bool,
2328
2329 #[serde(default = "default_heartbeat_cdc_interval")]
2331 #[validate(range(min = 1, max = 3600, message = "heartbeat interval must be 1-3600s"))]
2332 pub interval_secs: i32,
2333
2334 #[serde(default = "default_heartbeat_max_lag")]
2336 #[validate(range(min = 10, max = 86400, message = "max lag must be 10-86400s"))]
2337 pub max_lag_secs: i32,
2338
2339 #[serde(default)]
2341 pub emit_events: bool,
2342
2343 #[serde(default)]
2345 pub topic: Option<String>,
2346
2347 #[serde(default)]
2349 pub action_query: Option<String>,
2350}
2351
2352fn default_heartbeat_cdc_interval() -> i32 {
2353 10
2354}
2355fn default_heartbeat_max_lag() -> i32 {
2356 300
2357}
2358
2359#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2361#[serde(rename_all = "camelCase")]
2362pub struct DeduplicationCdcSpec {
2363 #[serde(default)]
2365 pub enabled: bool,
2366
2367 #[serde(default = "default_bloom_expected")]
2369 #[validate(range(
2370 min = 1000,
2371 max = 10000000,
2372 message = "bloom expected must be 1000-10M"
2373 ))]
2374 pub bloom_expected_insertions: i64,
2375
2376 #[serde(default = "default_bloom_fpp")]
2378 pub bloom_fpp: f64,
2379
2380 #[serde(default = "default_lru_size")]
2382 #[validate(range(min = 1000, max = 1000000, message = "LRU size must be 1000-1M"))]
2383 pub lru_size: i64,
2384
2385 #[serde(default = "default_dedup_window")]
2387 #[validate(range(min = 60, max = 604800, message = "window must be 60-604800s"))]
2388 pub window_secs: i64,
2389}
2390
2391fn default_bloom_expected() -> i64 {
2392 100_000
2393}
2394fn default_bloom_fpp() -> f64 {
2395 0.01
2396}
2397fn default_lru_size() -> i64 {
2398 10_000
2399}
2400fn default_dedup_window() -> i64 {
2401 3600
2402}
2403
2404#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2406#[serde(rename_all = "camelCase")]
2407pub struct TransactionTopicSpec {
2408 #[serde(default)]
2410 pub enabled: bool,
2411
2412 #[serde(default)]
2414 pub topic_name: Option<String>,
2415
2416 #[serde(default = "default_true_cdc")]
2418 pub include_data_collections: bool,
2419
2420 #[serde(default)]
2422 #[validate(range(min = 0, max = 10000, message = "min events must be 0-10000"))]
2423 pub min_events_threshold: i32,
2424}
2425
2426fn default_true_cdc() -> bool {
2427 true
2428}
2429
2430#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2432#[serde(rename_all = "camelCase")]
2433pub struct SchemaChangeTopicSpec {
2434 #[serde(default)]
2436 pub enabled: bool,
2437
2438 #[serde(default)]
2440 pub topic_name: Option<String>,
2441
2442 #[serde(default = "default_true_cdc")]
2444 pub include_columns: bool,
2445
2446 #[serde(default)]
2448 pub schemas: Vec<String>,
2449}
2450
2451#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2453#[serde(rename_all = "camelCase")]
2454pub struct TombstoneCdcSpec {
2455 #[serde(default)]
2457 pub enabled: bool,
2458
2459 #[serde(default = "default_true_cdc")]
2461 pub after_delete: bool,
2462
2463 #[serde(default)]
2465 #[validate(custom(function = "validate_tombstone_behavior"))]
2466 pub behavior: String,
2467}
2468
2469fn validate_tombstone_behavior(behavior: &str) -> Result<(), ValidationError> {
2470 match behavior {
2471 "" | "emit_null" | "emit_with_key" => Ok(()),
2472 _ => Err(ValidationError::new("invalid_tombstone_behavior")
2473 .with_message("tombstone behavior must be: emit_null or emit_with_key".into())),
2474 }
2475}
2476
2477#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2479#[serde(rename_all = "camelCase")]
2480pub struct FieldEncryptionSpec {
2481 #[serde(default)]
2483 pub enabled: bool,
2484
2485 #[serde(default)]
2487 #[validate(custom(function = "validate_optional_k8s_name"))]
2488 pub key_secret_ref: Option<String>,
2489
2490 #[serde(default)]
2492 pub fields: Vec<String>,
2493
2494 #[serde(default = "default_encryption_algorithm")]
2496 pub algorithm: String,
2497}
2498
2499fn default_encryption_algorithm() -> String {
2500 "aes-256-gcm".to_string()
2501}
2502
2503#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2505#[serde(rename_all = "camelCase")]
2506pub struct ReadOnlyReplicaSpec {
2507 #[serde(default)]
2509 pub enabled: bool,
2510
2511 #[serde(default = "default_lag_threshold")]
2513 #[validate(range(
2514 min = 100,
2515 max = 300000,
2516 message = "lag threshold must be 100-300000ms"
2517 ))]
2518 pub lag_threshold_ms: i64,
2519
2520 #[serde(default = "default_true_cdc")]
2522 pub deduplicate: bool,
2523
2524 #[serde(default)]
2526 #[validate(custom(function = "validate_watermark_source"))]
2527 pub watermark_source: String,
2528}
2529
2530fn default_lag_threshold() -> i64 {
2531 5000
2532}
2533
2534fn validate_watermark_source(source: &str) -> Result<(), ValidationError> {
2535 match source {
2536 "" | "primary" | "replica" => Ok(()),
2537 _ => Err(ValidationError::new("invalid_watermark_source")
2538 .with_message("watermark source must be: primary or replica".into())),
2539 }
2540}
2541
2542#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2544#[serde(rename_all = "camelCase")]
2545pub struct EventRouterSpec {
2546 #[serde(default)]
2548 pub enabled: bool,
2549
2550 #[serde(default)]
2552 pub default_destination: Option<String>,
2553
2554 #[serde(default)]
2556 pub dead_letter_queue: Option<String>,
2557
2558 #[serde(default)]
2560 pub drop_unroutable: bool,
2561
2562 #[serde(default)]
2564 #[validate(nested)]
2565 pub rules: Vec<RouteRuleSpec>,
2566}
2567
2568#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2570#[serde(rename_all = "camelCase")]
2571pub struct RouteRuleSpec {
2572 #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
2574 pub name: String,
2575
2576 #[serde(default)]
2578 pub priority: i32,
2579
2580 #[validate(custom(function = "validate_route_condition_type"))]
2582 pub condition_type: String,
2583
2584 #[serde(default)]
2586 pub condition_value: Option<String>,
2587
2588 #[serde(default)]
2590 pub condition_value2: Option<String>,
2591
2592 pub destinations: Vec<String>,
2594
2595 #[serde(default)]
2597 pub continue_matching: bool,
2598}
2599
2600fn validate_route_condition_type(condition: &str) -> Result<(), ValidationError> {
2601 match condition {
2602 "always" | "table" | "table_pattern" | "schema" | "operation" | "field_equals"
2603 | "field_exists" => Ok(()),
2604 _ => Err(ValidationError::new("invalid_route_condition_type").with_message(
2605 "condition type must be: always, table, table_pattern, schema, operation, field_equals, or field_exists".into(),
2606 )),
2607 }
2608}
2609
2610#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2612#[serde(rename_all = "camelCase")]
2613pub struct PartitionerSpec {
2614 #[serde(default)]
2616 pub enabled: bool,
2617
2618 #[serde(default = "default_num_partitions")]
2620 #[validate(range(min = 1, max = 10000, message = "num partitions must be 1-10000"))]
2621 pub num_partitions: i32,
2622
2623 #[serde(default = "default_partition_strategy")]
2625 #[validate(custom(function = "validate_partition_strategy"))]
2626 pub strategy: String,
2627
2628 #[serde(default)]
2630 pub key_columns: Vec<String>,
2631
2632 #[serde(default)]
2634 pub sticky_partition: Option<i32>,
2635}
2636
2637fn default_num_partitions() -> i32 {
2638 16
2639}
2640fn default_partition_strategy() -> String {
2641 "key_hash".to_string()
2642}
2643
2644fn validate_partition_strategy(strategy: &str) -> Result<(), ValidationError> {
2645 match strategy {
2646 "round_robin" | "key_hash" | "table_hash" | "full_table_hash" | "sticky" => Ok(()),
2647 _ => Err(ValidationError::new("invalid_partition_strategy").with_message(
2648 "partition strategy must be: round_robin, key_hash, table_hash, full_table_hash, or sticky".into(),
2649 )),
2650 }
2651}
2652
2653#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2655#[serde(rename_all = "camelCase")]
2656pub struct SmtTransformSpec {
2657 #[validate(custom(function = "validate_smt_transform_type"))]
2659 pub transform_type: String,
2660
2661 #[serde(default)]
2663 pub name: Option<String>,
2664
2665 #[serde(default)]
2667 pub config: serde_json::Value,
2668}
2669
2670fn validate_smt_transform_type(transform: &str) -> Result<(), ValidationError> {
2671 match transform {
2672 "extract_new_record_state" | "value_to_key" | "timestamp_converter"
2673 | "timezone_converter" | "mask_field" | "filter" | "flatten" | "insert_field"
2674 | "rename_field" | "replace_field" | "cast" | "regex_router" | "content_router"
2675 | "header_to_value" | "unwrap" | "set_null" | "compute_field" | "conditional_smt" => Ok(()),
2676 _ => Err(ValidationError::new("invalid_smt_transform_type").with_message(
2677 "transform type must be a valid SMT (extract_new_record_state, mask_field, filter, etc.)".into(),
2678 )),
2679 }
2680}
2681
2682#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2684#[serde(rename_all = "camelCase")]
2685pub struct ParallelCdcSpec {
2686 #[serde(default)]
2688 pub enabled: bool,
2689
2690 #[serde(default = "default_parallel_concurrency")]
2692 #[validate(range(min = 1, max = 64, message = "concurrency must be 1-64"))]
2693 pub concurrency: i32,
2694
2695 #[serde(default = "default_per_table_buffer")]
2697 #[validate(range(
2698 min = 100,
2699 max = 100000,
2700 message = "per table buffer must be 100-100000"
2701 ))]
2702 pub per_table_buffer: i32,
2703
2704 #[serde(default = "default_output_buffer")]
2706 #[validate(range(min = 1000, max = 1000000, message = "output buffer must be 1000-1M"))]
2707 pub output_buffer: i32,
2708
2709 #[serde(default = "default_true_cdc")]
2711 pub work_stealing: bool,
2712
2713 #[serde(default)]
2715 pub per_table_rate_limit: Option<i64>,
2716
2717 #[serde(default = "default_shutdown_timeout")]
2719 #[validate(range(min = 1, max = 300, message = "shutdown timeout must be 1-300s"))]
2720 pub shutdown_timeout_secs: i32,
2721}
2722
2723fn default_parallel_concurrency() -> i32 {
2724 4
2725}
2726fn default_per_table_buffer() -> i32 {
2727 1000
2728}
2729fn default_output_buffer() -> i32 {
2730 10_000
2731}
2732fn default_shutdown_timeout() -> i32 {
2733 30
2734}
2735
2736#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2738#[serde(rename_all = "camelCase")]
2739pub struct OutboxSpec {
2740 #[serde(default)]
2742 pub enabled: bool,
2743
2744 #[serde(default = "default_outbox_table")]
2746 #[validate(length(min = 1, max = 128, message = "outbox table must be 1-128 characters"))]
2747 pub table_name: String,
2748
2749 #[serde(default = "default_outbox_poll_interval")]
2751 #[validate(range(min = 10, max = 60000, message = "poll interval must be 10-60000ms"))]
2752 pub poll_interval_ms: i32,
2753
2754 #[serde(default = "default_outbox_batch_size")]
2756 #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2757 pub batch_size: i32,
2758
2759 #[serde(default = "default_outbox_max_retries")]
2761 #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
2762 pub max_retries: i32,
2763
2764 #[serde(default = "default_outbox_timeout")]
2766 #[validate(range(min = 1, max = 300, message = "timeout must be 1-300s"))]
2767 pub delivery_timeout_secs: i32,
2768
2769 #[serde(default = "default_true_cdc")]
2771 pub ordered_delivery: bool,
2772
2773 #[serde(default = "default_outbox_retention")]
2775 #[validate(range(min = 60, max = 604800, message = "retention must be 60-604800s"))]
2776 pub retention_secs: i64,
2777
2778 #[serde(default = "default_outbox_concurrency")]
2780 #[validate(range(min = 1, max = 100, message = "concurrency must be 1-100"))]
2781 pub max_concurrency: i32,
2782}
2783
2784fn default_outbox_table() -> String {
2785 "outbox".to_string()
2786}
2787fn default_outbox_poll_interval() -> i32 {
2788 100
2789}
2790fn default_outbox_batch_size() -> i32 {
2791 100
2792}
2793fn default_outbox_max_retries() -> i32 {
2794 3
2795}
2796fn default_outbox_timeout() -> i32 {
2797 30
2798}
2799fn default_outbox_retention() -> i64 {
2800 86400
2801}
2802fn default_outbox_concurrency() -> i32 {
2803 10
2804}
2805
2806#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2808#[serde(rename_all = "camelCase")]
2809pub struct HealthMonitorSpec {
2810 #[serde(default)]
2812 pub enabled: bool,
2813
2814 #[serde(default = "default_health_check_interval")]
2816 #[validate(range(min = 1, max = 300, message = "check interval must be 1-300s"))]
2817 pub check_interval_secs: i32,
2818
2819 #[serde(default = "default_health_max_lag")]
2821 #[validate(range(min = 1000, max = 3600000, message = "max lag must be 1000-3600000ms"))]
2822 pub max_lag_ms: i64,
2823
2824 #[serde(default = "default_health_failure_threshold")]
2826 #[validate(range(min = 1, max = 10, message = "failure threshold must be 1-10"))]
2827 pub failure_threshold: i32,
2828
2829 #[serde(default = "default_health_success_threshold")]
2831 #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
2832 pub success_threshold: i32,
2833
2834 #[serde(default = "default_health_check_timeout")]
2836 #[validate(range(min = 1, max = 60, message = "check timeout must be 1-60s"))]
2837 pub check_timeout_secs: i32,
2838
2839 #[serde(default = "default_true_cdc")]
2841 pub auto_recovery: bool,
2842
2843 #[serde(default = "default_health_recovery_delay")]
2845 #[validate(range(min = 1, max = 300, message = "recovery delay must be 1-300s"))]
2846 pub recovery_delay_secs: i32,
2847
2848 #[serde(default = "default_health_max_recovery_delay")]
2850 #[validate(range(min = 1, max = 3600, message = "max recovery delay must be 1-3600s"))]
2851 pub max_recovery_delay_secs: i32,
2852}
2853
2854fn default_health_check_interval() -> i32 {
2855 10
2856}
2857fn default_health_max_lag() -> i64 {
2858 30_000
2859}
2860fn default_health_failure_threshold() -> i32 {
2861 3
2862}
2863fn default_health_success_threshold() -> i32 {
2864 2
2865}
2866fn default_health_check_timeout() -> i32 {
2867 5
2868}
2869fn default_health_recovery_delay() -> i32 {
2870 1
2871}
2872fn default_health_max_recovery_delay() -> i32 {
2873 60
2874}
2875
2876#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2914#[kube(
2915 group = "rivven.hupe1980.github.io",
2916 version = "v1alpha1",
2917 kind = "RivvenSchemaRegistry",
2918 plural = "rivvenschemaregistries",
2919 shortname = "rsr",
2920 namespaced,
2921 status = "RivvenSchemaRegistryStatus",
2922 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
2923 printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
2924 printcolumn = r#"{"name":"Ready","type":"integer","jsonPath":".status.readyReplicas"}"#,
2925 printcolumn = r#"{"name":"Schemas","type":"integer","jsonPath":".status.schemasRegistered"}"#,
2926 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
2927 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
2928)]
2929#[serde(rename_all = "camelCase")]
2930pub struct RivvenSchemaRegistrySpec {
2931 #[validate(nested)]
2933 pub cluster_ref: ClusterReference,
2934
2935 #[serde(default = "default_schema_registry_replicas")]
2937 #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
2938 pub replicas: i32,
2939
2940 #[serde(default = "default_version")]
2942 pub version: String,
2943
2944 #[serde(default)]
2946 #[validate(custom(function = "validate_optional_image"))]
2947 pub image: Option<String>,
2948
2949 #[serde(default = "default_image_pull_policy")]
2951 #[validate(custom(function = "validate_pull_policy"))]
2952 pub image_pull_policy: String,
2953
2954 #[serde(default)]
2956 pub image_pull_secrets: Vec<String>,
2957
2958 #[serde(default)]
2960 pub resources: Option<serde_json::Value>,
2961
2962 #[serde(default)]
2964 #[validate(nested)]
2965 pub server: SchemaRegistryServerSpec,
2966
2967 #[serde(default)]
2969 #[validate(nested)]
2970 pub storage: SchemaRegistryStorageSpec,
2971
2972 #[serde(default)]
2974 #[validate(nested)]
2975 pub compatibility: SchemaCompatibilitySpec,
2976
2977 #[serde(default)]
2979 #[validate(nested)]
2980 pub schemas: SchemaFormatSpec,
2981
2982 #[serde(default)]
2984 #[validate(nested)]
2985 pub contexts: SchemaContextsSpec,
2986
2987 #[serde(default)]
2989 #[validate(nested)]
2990 pub validation: SchemaValidationSpec,
2991
2992 #[serde(default)]
2994 #[validate(nested)]
2995 pub auth: SchemaRegistryAuthSpec,
2996
2997 #[serde(default)]
2999 #[validate(nested)]
3000 pub tls: SchemaRegistryTlsSpec,
3001
3002 #[serde(default)]
3004 #[validate(nested)]
3005 pub metrics: SchemaRegistryMetricsSpec,
3006
3007 #[serde(default)]
3009 #[validate(nested)]
3010 pub external: ExternalRegistrySpec,
3011
3012 #[serde(default)]
3014 #[validate(custom(function = "validate_annotations"))]
3015 pub pod_annotations: BTreeMap<String, String>,
3016
3017 #[serde(default)]
3019 #[validate(custom(function = "validate_labels"))]
3020 pub pod_labels: BTreeMap<String, String>,
3021
3022 #[serde(default)]
3024 #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
3025 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
3026
3027 #[serde(default)]
3029 pub node_selector: BTreeMap<String, String>,
3030
3031 #[serde(default)]
3033 #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
3034 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
3035
3036 #[serde(default)]
3038 pub affinity: Option<serde_json::Value>,
3039
3040 #[serde(default)]
3042 #[validate(custom(function = "validate_optional_k8s_name"))]
3043 pub service_account: Option<String>,
3044
3045 #[serde(default)]
3047 pub security_context: Option<serde_json::Value>,
3048
3049 #[serde(default)]
3051 pub container_security_context: Option<serde_json::Value>,
3052
3053 #[serde(default)]
3055 #[validate(nested)]
3056 pub liveness_probe: ProbeSpec,
3057
3058 #[serde(default)]
3060 #[validate(nested)]
3061 pub readiness_probe: ProbeSpec,
3062
3063 #[serde(default)]
3065 #[validate(nested)]
3066 pub pod_disruption_budget: PdbSpec,
3067}
3068
3069fn default_schema_registry_replicas() -> i32 {
3070 1
3071}
3072
3073#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3075#[serde(rename_all = "camelCase")]
3076pub struct SchemaRegistryServerSpec {
3077 #[serde(default = "default_schema_registry_port")]
3079 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3080 pub port: i32,
3081
3082 #[serde(default = "default_bind_address")]
3084 pub bind_address: String,
3085
3086 #[serde(default = "default_request_timeout")]
3088 #[validate(range(min = 1, max = 300, message = "timeout must be 1-300 seconds"))]
3089 pub timeout_seconds: i32,
3090
3091 #[serde(default = "default_max_request_size")]
3093 #[validate(range(
3094 min = 1024,
3095 max = 104857600,
3096 message = "max request size must be 1KB-100MB"
3097 ))]
3098 pub max_request_size: i64,
3099
3100 #[serde(default)]
3102 pub cors_enabled: bool,
3103
3104 #[serde(default)]
3106 pub cors_allowed_origins: Vec<String>,
3107}
3108
3109fn default_schema_registry_port() -> i32 {
3110 8081
3111}
3112
3113fn default_bind_address() -> String {
3114 "0.0.0.0".to_string()
3115}
3116
3117fn default_request_timeout() -> i32 {
3118 30
3119}
3120
3121fn default_max_request_size() -> i64 {
3122 10_485_760 }
3124
3125impl Default for SchemaRegistryServerSpec {
3126 fn default() -> Self {
3127 Self {
3128 port: 8081,
3129 bind_address: "0.0.0.0".to_string(),
3130 timeout_seconds: 30,
3131 max_request_size: 10_485_760,
3132 cors_enabled: false,
3133 cors_allowed_origins: vec![],
3134 }
3135 }
3136}
3137
3138#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3140#[serde(rename_all = "camelCase")]
3141pub struct SchemaRegistryStorageSpec {
3142 #[serde(default = "default_storage_mode")]
3144 #[validate(custom(function = "validate_storage_mode"))]
3145 pub mode: String,
3146
3147 #[serde(default = "default_schema_topic")]
3149 #[validate(length(max = 255, message = "topic name max 255 characters"))]
3150 pub topic: String,
3151
3152 #[serde(default = "default_schema_topic_replication")]
3154 #[validate(range(min = 1, max = 10, message = "replication factor must be 1-10"))]
3155 pub replication_factor: i32,
3156
3157 #[serde(default = "default_schema_topic_partitions")]
3159 #[validate(range(min = 1, max = 100, message = "partitions must be 1-100"))]
3160 pub partitions: i32,
3161
3162 #[serde(default = "default_true")]
3164 pub normalize: bool,
3165}
3166
3167fn default_storage_mode() -> String {
3168 "broker".to_string()
3169}
3170
3171fn default_schema_topic() -> String {
3172 "_schemas".to_string()
3173}
3174
3175fn default_schema_topic_replication() -> i32 {
3176 3
3177}
3178
3179fn default_schema_topic_partitions() -> i32 {
3180 1
3181}
3182
3183fn validate_storage_mode(mode: &str) -> Result<(), ValidationError> {
3184 match mode {
3185 "memory" | "broker" => Ok(()),
3186 _ => Err(ValidationError::new("invalid_storage_mode")
3187 .with_message("storage mode must be 'memory' or 'broker'".into())),
3188 }
3189}
3190
3191impl Default for SchemaRegistryStorageSpec {
3192 fn default() -> Self {
3193 Self {
3194 mode: "broker".to_string(),
3195 topic: "_schemas".to_string(),
3196 replication_factor: 3,
3197 partitions: 1,
3198 normalize: true,
3199 }
3200 }
3201}
3202
3203#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3205#[serde(rename_all = "camelCase")]
3206pub struct SchemaCompatibilitySpec {
3207 #[serde(default = "default_compatibility_level")]
3209 #[validate(custom(function = "validate_compatibility_level"))]
3210 pub default_level: String,
3211
3212 #[serde(default = "default_true")]
3214 pub allow_overrides: bool,
3215
3216 #[serde(default)]
3218 #[validate(custom(function = "validate_subject_compatibility_map"))]
3219 pub subjects: BTreeMap<String, String>,
3220}
3221
3222fn default_compatibility_level() -> String {
3223 "BACKWARD".to_string()
3224}
3225
3226fn validate_compatibility_level(level: &str) -> Result<(), ValidationError> {
3227 let valid_levels = [
3228 "BACKWARD",
3229 "BACKWARD_TRANSITIVE",
3230 "FORWARD",
3231 "FORWARD_TRANSITIVE",
3232 "FULL",
3233 "FULL_TRANSITIVE",
3234 "NONE",
3235 ];
3236 if valid_levels.contains(&level) {
3237 Ok(())
3238 } else {
3239 Err(
3240 ValidationError::new("invalid_compatibility_level").with_message(
3241 format!(
3242 "'{}' is not valid. Must be one of: {:?}",
3243 level, valid_levels
3244 )
3245 .into(),
3246 ),
3247 )
3248 }
3249}
3250
3251fn validate_subject_compatibility_map(
3252 subjects: &BTreeMap<String, String>,
3253) -> Result<(), ValidationError> {
3254 if subjects.len() > 1000 {
3255 return Err(ValidationError::new("too_many_subjects")
3256 .with_message("maximum 1000 per-subject compatibility entries".into()));
3257 }
3258 for (subject, level) in subjects {
3259 if subject.len() > 255 {
3260 return Err(ValidationError::new("subject_name_too_long")
3261 .with_message(format!("subject '{}' exceeds 255 characters", subject).into()));
3262 }
3263 validate_compatibility_level(level)?;
3264 }
3265 Ok(())
3266}
3267
3268impl Default for SchemaCompatibilitySpec {
3269 fn default() -> Self {
3270 Self {
3271 default_level: "BACKWARD".to_string(),
3272 allow_overrides: true,
3273 subjects: BTreeMap::new(),
3274 }
3275 }
3276}
3277
3278#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3280#[serde(rename_all = "camelCase")]
3281pub struct SchemaFormatSpec {
3282 #[serde(default = "default_true")]
3284 pub avro: bool,
3285
3286 #[serde(default = "default_true")]
3288 pub json_schema: bool,
3289
3290 #[serde(default = "default_true")]
3292 pub protobuf: bool,
3293
3294 #[serde(default = "default_true")]
3296 pub strict_validation: bool,
3297}
3298
3299impl Default for SchemaFormatSpec {
3300 fn default() -> Self {
3301 Self {
3302 avro: true,
3303 json_schema: true,
3304 protobuf: true,
3305 strict_validation: true,
3306 }
3307 }
3308}
3309
3310#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3312#[serde(rename_all = "camelCase")]
3313pub struct SchemaContextsSpec {
3314 #[serde(default)]
3316 pub enabled: bool,
3317
3318 #[serde(default)]
3320 #[validate(range(min = 0, max = 10000, message = "max contexts must be 0-10000"))]
3321 pub max_contexts: i32,
3322
3323 #[serde(default)]
3325 #[validate(length(max = 100, message = "maximum 100 pre-defined contexts"))]
3326 pub predefined: Vec<SchemaContextDefinition>,
3327}
3328
3329#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3331#[serde(rename_all = "camelCase")]
3332pub struct SchemaContextDefinition {
3333 #[validate(length(min = 1, max = 128, message = "context name must be 1-128 characters"))]
3335 pub name: String,
3336
3337 #[serde(default)]
3339 #[validate(length(max = 512, message = "description max 512 characters"))]
3340 pub description: Option<String>,
3341
3342 #[serde(default = "default_true")]
3344 pub active: bool,
3345}
3346
3347#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3349#[serde(rename_all = "camelCase")]
3350pub struct SchemaValidationSpec {
3351 #[serde(default)]
3353 pub enabled: bool,
3354
3355 #[serde(default = "default_max_schema_size")]
3357 #[validate(range(
3358 min = 1024,
3359 max = 10485760,
3360 message = "max schema size must be 1KB-10MB"
3361 ))]
3362 pub max_schema_size: i64,
3363
3364 #[serde(default)]
3366 #[validate(length(max = 100, message = "maximum 100 validation rules"))]
3367 pub rules: Vec<SchemaValidationRule>,
3368}
3369
3370fn default_max_schema_size() -> i64 {
3371 1_048_576 }
3373
3374impl Default for SchemaValidationSpec {
3375 fn default() -> Self {
3376 Self {
3377 enabled: false,
3378 max_schema_size: 1_048_576,
3379 rules: vec![],
3380 }
3381 }
3382}
3383
3384#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3386#[serde(rename_all = "camelCase")]
3387pub struct SchemaValidationRule {
3388 #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
3390 pub name: String,
3391
3392 #[validate(custom(function = "validate_rule_type"))]
3394 pub rule_type: String,
3395
3396 #[validate(length(min = 1, max = 4096, message = "pattern must be 1-4096 characters"))]
3398 pub pattern: String,
3399
3400 #[serde(default)]
3402 pub subjects: Vec<String>,
3403
3404 #[serde(default)]
3406 pub schema_types: Vec<String>,
3407
3408 #[serde(default = "default_validation_level")]
3410 #[validate(custom(function = "validate_validation_level"))]
3411 pub level: String,
3412
3413 #[serde(default)]
3415 #[validate(length(max = 512, message = "description max 512 characters"))]
3416 pub description: Option<String>,
3417}
3418
3419fn validate_rule_type(rule_type: &str) -> Result<(), ValidationError> {
3420 let valid_types = [
3421 "regex",
3422 "field_exists",
3423 "field_type",
3424 "naming_convention",
3425 "documentation",
3426 ];
3427 if valid_types.contains(&rule_type) {
3428 Ok(())
3429 } else {
3430 Err(ValidationError::new("invalid_rule_type").with_message(
3431 format!(
3432 "'{}' is not valid. Must be one of: {:?}",
3433 rule_type, valid_types
3434 )
3435 .into(),
3436 ))
3437 }
3438}
3439
3440fn default_validation_level() -> String {
3441 "error".to_string()
3442}
3443
3444fn validate_validation_level(level: &str) -> Result<(), ValidationError> {
3445 match level {
3446 "error" | "warning" | "info" => Ok(()),
3447 _ => Err(ValidationError::new("invalid_validation_level")
3448 .with_message("validation level must be 'error', 'warning', or 'info'".into())),
3449 }
3450}
3451
3452#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3454#[serde(rename_all = "camelCase")]
3455pub struct SchemaRegistryAuthSpec {
3456 #[serde(default)]
3458 pub enabled: bool,
3459
3460 #[serde(default)]
3462 #[validate(custom(function = "validate_auth_method"))]
3463 pub method: Option<String>,
3464
3465 #[serde(default)]
3467 #[validate(custom(function = "validate_optional_k8s_name"))]
3468 pub credentials_secret_ref: Option<String>,
3469
3470 #[serde(default)]
3472 #[validate(nested)]
3473 pub jwt: JwtAuthSpec,
3474
3475 #[serde(default)]
3477 #[validate(length(max = 100, message = "maximum 100 users"))]
3478 pub users: Vec<SchemaRegistryUser>,
3479}
3480
3481fn validate_auth_method(method: &str) -> Result<(), ValidationError> {
3482 match method {
3483 "" | "basic" | "jwt" | "cedar" => Ok(()),
3484 _ => Err(ValidationError::new("invalid_auth_method")
3485 .with_message("auth method must be 'basic', 'jwt', or 'cedar'".into())),
3486 }
3487}
3488
3489#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3491#[serde(rename_all = "camelCase")]
3492pub struct JwtAuthSpec {
3493 #[serde(default)]
3495 pub issuer_url: Option<String>,
3496
3497 #[serde(default)]
3499 pub jwks_url: Option<String>,
3500
3501 #[serde(default)]
3503 pub audience: Option<String>,
3504
3505 #[serde(default = "default_username_claim")]
3507 pub username_claim: String,
3508
3509 #[serde(default = "default_roles_claim")]
3511 pub roles_claim: String,
3512}
3513
3514fn default_username_claim() -> String {
3515 "sub".to_string()
3516}
3517
3518fn default_roles_claim() -> String {
3519 "roles".to_string()
3520}
3521
3522#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3524#[serde(rename_all = "camelCase")]
3525pub struct SchemaRegistryUser {
3526 #[validate(length(min = 1, max = 128, message = "username must be 1-128 characters"))]
3528 pub username: String,
3529
3530 #[serde(default)]
3532 #[validate(custom(function = "validate_optional_k8s_name"))]
3533 pub password_secret_key: Option<String>,
3534
3535 #[serde(default = "default_user_role")]
3537 #[validate(custom(function = "validate_user_role"))]
3538 pub role: String,
3539
3540 #[serde(default)]
3542 pub allowed_subjects: Vec<String>,
3543}
3544
3545fn default_user_role() -> String {
3546 "reader".to_string()
3547}
3548
3549fn validate_user_role(role: &str) -> Result<(), ValidationError> {
3550 match role {
3551 "admin" | "writer" | "reader" => Ok(()),
3552 _ => Err(ValidationError::new("invalid_user_role")
3553 .with_message("role must be 'admin', 'writer', or 'reader'".into())),
3554 }
3555}
3556
3557#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3559#[serde(rename_all = "camelCase")]
3560pub struct SchemaRegistryTlsSpec {
3561 #[serde(default)]
3563 pub enabled: bool,
3564
3565 #[serde(default)]
3567 #[validate(custom(function = "validate_optional_k8s_name"))]
3568 pub cert_secret_name: Option<String>,
3569
3570 #[serde(default)]
3572 pub mtls_enabled: bool,
3573
3574 #[serde(default)]
3576 #[validate(custom(function = "validate_optional_k8s_name"))]
3577 pub ca_secret_name: Option<String>,
3578
3579 #[serde(default)]
3581 pub broker_tls_enabled: bool,
3582
3583 #[serde(default)]
3585 #[validate(custom(function = "validate_optional_k8s_name"))]
3586 pub broker_cert_secret_name: Option<String>,
3587}
3588
3589#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3591#[serde(rename_all = "camelCase")]
3592pub struct SchemaRegistryMetricsSpec {
3593 #[serde(default = "default_true")]
3595 pub enabled: bool,
3596
3597 #[serde(default = "default_schema_metrics_port")]
3599 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3600 pub port: i32,
3601
3602 #[serde(default = "default_metrics_path")]
3604 pub path: String,
3605
3606 #[serde(default)]
3608 pub service_monitor_enabled: bool,
3609
3610 #[serde(default = "default_scrape_interval")]
3612 #[validate(custom(function = "validate_duration"))]
3613 pub scrape_interval: String,
3614}
3615
3616fn default_schema_metrics_port() -> i32 {
3617 9090
3618}
3619
3620fn default_metrics_path() -> String {
3621 "/metrics".to_string()
3622}
3623
3624impl Default for SchemaRegistryMetricsSpec {
3625 fn default() -> Self {
3626 Self {
3627 enabled: true,
3628 port: 9090,
3629 path: "/metrics".to_string(),
3630 service_monitor_enabled: false,
3631 scrape_interval: "30s".to_string(),
3632 }
3633 }
3634}
3635
3636#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3638#[serde(rename_all = "camelCase")]
3639pub struct ExternalRegistrySpec {
3640 #[serde(default)]
3642 pub enabled: bool,
3643
3644 #[serde(default)]
3646 #[validate(custom(function = "validate_external_registry_type"))]
3647 pub registry_type: Option<String>,
3648
3649 #[serde(default)]
3651 pub confluent_url: Option<String>,
3652
3653 #[serde(default)]
3655 pub glue_registry_arn: Option<String>,
3656
3657 #[serde(default)]
3659 pub aws_region: Option<String>,
3660
3661 #[serde(default)]
3663 #[validate(custom(function = "validate_sync_mode"))]
3664 pub sync_mode: Option<String>,
3665
3666 #[serde(default)]
3668 pub sync_subjects: Vec<String>,
3669
3670 #[serde(default = "default_sync_interval")]
3672 #[validate(range(
3673 min = 10,
3674 max = 86400,
3675 message = "sync interval must be 10-86400 seconds"
3676 ))]
3677 pub sync_interval_seconds: i32,
3678
3679 #[serde(default)]
3681 #[validate(custom(function = "validate_optional_k8s_name"))]
3682 pub credentials_secret_ref: Option<String>,
3683}
3684
3685fn validate_external_registry_type(reg_type: &str) -> Result<(), ValidationError> {
3686 match reg_type {
3687 "" | "confluent" | "glue" => Ok(()),
3688 _ => Err(ValidationError::new("invalid_external_registry_type")
3689 .with_message("registry type must be 'confluent' or 'glue'".into())),
3690 }
3691}
3692
3693fn validate_sync_mode(mode: &str) -> Result<(), ValidationError> {
3694 match mode {
3695 "" | "mirror" | "push" | "bidirectional" => Ok(()),
3696 _ => Err(ValidationError::new("invalid_sync_mode")
3697 .with_message("sync mode must be 'mirror', 'push', or 'bidirectional'".into())),
3698 }
3699}
3700
3701fn default_sync_interval() -> i32 {
3702 300 }
3704
3705#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
3707#[serde(rename_all = "camelCase")]
3708pub struct RivvenSchemaRegistryStatus {
3709 #[serde(default)]
3711 pub phase: SchemaRegistryPhase,
3712
3713 pub replicas: i32,
3715
3716 pub ready_replicas: i32,
3718
3719 pub schemas_registered: i32,
3721
3722 pub subjects_count: i32,
3724
3725 pub contexts_count: i32,
3727
3728 pub observed_generation: i64,
3730
3731 #[serde(default)]
3733 pub conditions: Vec<SchemaRegistryCondition>,
3734
3735 #[serde(default)]
3737 pub endpoints: Vec<String>,
3738
3739 pub storage_status: Option<String>,
3741
3742 pub external_sync_status: Option<String>,
3744
3745 pub last_sync_time: Option<String>,
3747
3748 pub last_updated: Option<String>,
3750
3751 pub message: Option<String>,
3753}
3754
3755#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
3757pub enum SchemaRegistryPhase {
3758 #[default]
3760 Pending,
3761 Provisioning,
3763 Running,
3765 Updating,
3767 Degraded,
3769 Failed,
3771 Terminating,
3773}
3774
3775#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
3777#[serde(rename_all = "camelCase")]
3778pub struct SchemaRegistryCondition {
3779 #[serde(rename = "type")]
3781 pub condition_type: String,
3782
3783 pub status: String,
3785
3786 pub reason: Option<String>,
3788
3789 pub message: Option<String>,
3791
3792 pub last_transition_time: Option<String>,
3794}
3795
3796impl RivvenSchemaRegistrySpec {
3797 pub fn get_image(&self) -> String {
3799 if let Some(ref image) = self.image {
3800 image.clone()
3801 } else {
3802 format!("ghcr.io/hupe1980/rivven-schema:{}", self.version)
3803 }
3804 }
3805
3806 pub fn get_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3808 let mut labels = BTreeMap::new();
3809 labels.insert(
3810 "app.kubernetes.io/name".to_string(),
3811 "rivven-schema-registry".to_string(),
3812 );
3813 labels.insert(
3814 "app.kubernetes.io/instance".to_string(),
3815 registry_name.to_string(),
3816 );
3817 labels.insert(
3818 "app.kubernetes.io/component".to_string(),
3819 "schema-registry".to_string(),
3820 );
3821 labels.insert(
3822 "app.kubernetes.io/managed-by".to_string(),
3823 "rivven-operator".to_string(),
3824 );
3825 labels.insert(
3826 "app.kubernetes.io/version".to_string(),
3827 self.version.clone(),
3828 );
3829 labels
3830 }
3831
3832 pub fn get_selector_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3834 let mut labels = BTreeMap::new();
3835 labels.insert(
3836 "app.kubernetes.io/name".to_string(),
3837 "rivven-schema-registry".to_string(),
3838 );
3839 labels.insert(
3840 "app.kubernetes.io/instance".to_string(),
3841 registry_name.to_string(),
3842 );
3843 labels
3844 }
3845}
3846
3847#[cfg(test)]
3848mod tests {
3849 use super::*;
3850
3851 #[test]
3852 fn test_default_spec() {
3853 let spec = RivvenClusterSpec {
3854 replicas: 3,
3855 version: "0.0.1".to_string(),
3856 image: None,
3857 image_pull_policy: "IfNotPresent".to_string(),
3858 image_pull_secrets: vec![],
3859 storage: StorageSpec::default(),
3860 resources: None,
3861 config: BrokerConfig::default(),
3862 tls: TlsSpec::default(),
3863 metrics: MetricsSpec::default(),
3864 affinity: None,
3865 node_selector: BTreeMap::new(),
3866 tolerations: vec![],
3867 pod_disruption_budget: PdbSpec::default(),
3868 service_account: None,
3869 pod_annotations: BTreeMap::new(),
3870 pod_labels: BTreeMap::new(),
3871 env: vec![],
3872 liveness_probe: ProbeSpec::default(),
3873 readiness_probe: ProbeSpec::default(),
3874 security_context: None,
3875 container_security_context: None,
3876 };
3877
3878 assert_eq!(spec.replicas, 3);
3879 assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven:0.0.1");
3880 }
3881
3882 #[test]
3883 fn test_get_labels() {
3884 let spec = RivvenClusterSpec {
3885 replicas: 3,
3886 version: "0.0.1".to_string(),
3887 image: None,
3888 image_pull_policy: "IfNotPresent".to_string(),
3889 image_pull_secrets: vec![],
3890 storage: StorageSpec::default(),
3891 resources: None,
3892 config: BrokerConfig::default(),
3893 tls: TlsSpec::default(),
3894 metrics: MetricsSpec::default(),
3895 affinity: None,
3896 node_selector: BTreeMap::new(),
3897 tolerations: vec![],
3898 pod_disruption_budget: PdbSpec::default(),
3899 service_account: None,
3900 pod_annotations: BTreeMap::new(),
3901 pod_labels: BTreeMap::new(),
3902 env: vec![],
3903 liveness_probe: ProbeSpec::default(),
3904 readiness_probe: ProbeSpec::default(),
3905 security_context: None,
3906 container_security_context: None,
3907 };
3908
3909 let labels = spec.get_labels("my-cluster");
3910 assert_eq!(
3911 labels.get("app.kubernetes.io/name"),
3912 Some(&"rivven".to_string())
3913 );
3914 assert_eq!(
3915 labels.get("app.kubernetes.io/instance"),
3916 Some(&"my-cluster".to_string())
3917 );
3918 }
3919
3920 #[test]
3921 fn test_custom_image() {
3922 let spec = RivvenClusterSpec {
3923 replicas: 1,
3924 version: "0.0.1".to_string(),
3925 image: Some("my-registry/rivven:custom".to_string()),
3926 image_pull_policy: "Always".to_string(),
3927 image_pull_secrets: vec![],
3928 storage: StorageSpec::default(),
3929 resources: None,
3930 config: BrokerConfig::default(),
3931 tls: TlsSpec::default(),
3932 metrics: MetricsSpec::default(),
3933 affinity: None,
3934 node_selector: BTreeMap::new(),
3935 tolerations: vec![],
3936 pod_disruption_budget: PdbSpec::default(),
3937 service_account: None,
3938 pod_annotations: BTreeMap::new(),
3939 pod_labels: BTreeMap::new(),
3940 env: vec![],
3941 liveness_probe: ProbeSpec::default(),
3942 readiness_probe: ProbeSpec::default(),
3943 security_context: None,
3944 container_security_context: None,
3945 };
3946
3947 assert_eq!(spec.get_image(), "my-registry/rivven:custom");
3948 }
3949
3950 #[test]
3951 fn test_cluster_phase_default() {
3952 let phase = ClusterPhase::default();
3953 assert_eq!(phase, ClusterPhase::Pending);
3954 }
3955
3956 #[test]
3957 fn test_storage_spec_default() {
3958 let storage = StorageSpec::default();
3959 assert_eq!(storage.size, "10Gi");
3960 assert!(storage.storage_class_name.is_none());
3961 }
3962
3963 #[test]
3964 fn test_broker_config_defaults() {
3965 let config = BrokerConfig::default();
3966 assert_eq!(config.default_partitions, 3);
3967 assert_eq!(config.default_replication_factor, 2);
3968 assert!(config.auto_create_topics);
3969 }
3970
3971 #[test]
3972 fn test_probe_spec_defaults() {
3973 let probe = ProbeSpec::default();
3974 assert!(probe.enabled);
3975 assert_eq!(probe.initial_delay_seconds, 30);
3976 assert_eq!(probe.period_seconds, 10);
3977 }
3978
3979 #[test]
3980 fn test_validate_quantity_valid() {
3981 assert!(validate_quantity("10Gi").is_ok());
3982 assert!(validate_quantity("100Mi").is_ok());
3983 assert!(validate_quantity("1Ti").is_ok());
3984 assert!(validate_quantity("500").is_ok());
3985 assert!(validate_quantity("1.5Gi").is_ok());
3986 }
3987
3988 #[test]
3989 fn test_validate_quantity_invalid() {
3990 assert!(validate_quantity("10GB").is_err()); assert!(validate_quantity("abc").is_err()); assert!(validate_quantity("-10Gi").is_err()); assert!(validate_quantity("").is_err()); }
3995
3996 #[test]
3997 fn test_validate_k8s_name_valid() {
3998 assert!(validate_k8s_name("my-cluster").is_ok());
3999 assert!(validate_k8s_name("cluster123").is_ok());
4000 assert!(validate_k8s_name("a").is_ok());
4001 }
4002
4003 #[test]
4004 fn test_validate_k8s_name_invalid() {
4005 assert!(validate_k8s_name("My-Cluster").is_err()); assert!(validate_k8s_name("-cluster").is_err()); assert!(validate_k8s_name("cluster-").is_err()); assert!(validate_k8s_name("cluster_name").is_err()); }
4010
4011 #[test]
4012 fn test_validate_compression_type() {
4013 assert!(validate_compression_type("lz4").is_ok());
4014 assert!(validate_compression_type("zstd").is_ok());
4015 assert!(validate_compression_type("none").is_ok());
4016 assert!(validate_compression_type("invalid").is_err());
4017 }
4018
4019 #[test]
4020 fn test_validate_segment_size() {
4021 assert!(validate_segment_size(1_048_576).is_ok()); assert!(validate_segment_size(10_737_418_240).is_ok()); assert!(validate_segment_size(1_073_741_824).is_ok()); assert!(validate_segment_size(1_000).is_err()); assert!(validate_segment_size(20_000_000_000).is_err()); }
4027
4028 #[test]
4029 fn test_validate_message_size() {
4030 assert!(validate_message_size(1_024).is_ok()); assert!(validate_message_size(104_857_600).is_ok()); assert!(validate_message_size(1_048_576).is_ok()); assert!(validate_message_size(100).is_err()); assert!(validate_message_size(200_000_000).is_err()); }
4036
4037 #[test]
4038 fn test_validate_pull_policy() {
4039 assert!(validate_pull_policy("Always").is_ok());
4040 assert!(validate_pull_policy("IfNotPresent").is_ok());
4041 assert!(validate_pull_policy("Never").is_ok());
4042 assert!(validate_pull_policy("always").is_err()); assert!(validate_pull_policy("Invalid").is_err());
4044 }
4045
4046 #[test]
4047 fn test_validate_duration() {
4048 assert!(validate_duration("30s").is_ok());
4049 assert!(validate_duration("1m").is_ok());
4050 assert!(validate_duration("5m30s").is_ok());
4051 assert!(validate_duration("1h").is_ok());
4052 assert!(validate_duration("invalid").is_err());
4053 assert!(validate_duration("30").is_err()); }
4055
4056 #[test]
4057 fn test_validate_access_modes() {
4058 assert!(validate_access_modes(&["ReadWriteOnce".to_string()]).is_ok());
4059 assert!(
4060 validate_access_modes(&["ReadWriteOnce".to_string(), "ReadOnlyMany".to_string()])
4061 .is_ok()
4062 );
4063 assert!(validate_access_modes(&["Invalid".to_string()]).is_err());
4064 }
4065
4066 #[test]
4068 fn test_connect_spec_defaults() {
4069 let spec = RivvenConnectSpec {
4070 cluster_ref: ClusterReference {
4071 name: "my-cluster".to_string(),
4072 namespace: None,
4073 },
4074 replicas: 1,
4075 version: "0.0.1".to_string(),
4076 image: None,
4077 image_pull_policy: "IfNotPresent".to_string(),
4078 image_pull_secrets: vec![],
4079 resources: None,
4080 config: ConnectConfigSpec::default(),
4081 sources: vec![],
4082 sinks: vec![],
4083 settings: GlobalConnectSettings::default(),
4084 tls: ConnectTlsSpec::default(),
4085 pod_annotations: BTreeMap::new(),
4086 pod_labels: BTreeMap::new(),
4087 env: vec![],
4088 node_selector: BTreeMap::new(),
4089 tolerations: vec![],
4090 affinity: None,
4091 service_account: None,
4092 security_context: None,
4093 container_security_context: None,
4094 };
4095 assert_eq!(spec.replicas, 1);
4096 }
4097
4098 #[test]
4099 fn test_connect_phase_default() {
4100 let phase = ConnectPhase::default();
4101 assert_eq!(phase, ConnectPhase::Pending);
4102 }
4103
4104 #[test]
4105 fn test_validate_connector_type() {
4106 assert!(validate_connector_type("postgres-cdc").is_ok());
4107 assert!(validate_connector_type("mysql-cdc").is_ok());
4108 assert!(validate_connector_type("http").is_ok());
4109 assert!(validate_connector_type("stdout").is_ok());
4110 assert!(validate_connector_type("s3").is_ok());
4111 assert!(validate_connector_type("datagen").is_ok());
4112 assert!(validate_connector_type("custom-connector").is_ok());
4113 }
4114
4115 #[test]
4116 fn test_validate_start_offset() {
4117 assert!(validate_start_offset("earliest").is_ok());
4118 assert!(validate_start_offset("latest").is_ok());
4119 assert!(validate_start_offset("2024-01-01T00:00:00Z").is_ok());
4120 assert!(validate_start_offset("invalid").is_err());
4121 }
4122
4123 #[test]
4124 fn test_validate_image_valid() {
4125 assert!(validate_image("nginx").is_ok());
4126 assert!(validate_image("nginx:latest").is_ok());
4127 assert!(validate_image("ghcr.io/hupe1980/rivven:0.0.1").is_ok());
4128 assert!(validate_image("my-registry.io:5000/image:tag").is_ok());
4129 assert!(validate_image("localhost:5000/myimage").is_ok());
4130 assert!(validate_image("").is_ok()); }
4132
4133 #[test]
4134 fn test_validate_image_invalid() {
4135 assert!(validate_image("/absolute/path").is_err()); assert!(validate_image("-invalid").is_err()); assert!(validate_image("image..path").is_err()); let long_name = "a".repeat(300);
4140 assert!(validate_image(&long_name).is_err());
4141 }
4142
4143 #[test]
4144 fn test_validate_node_selector() {
4145 let mut selectors = BTreeMap::new();
4146 selectors.insert("node-type".to_string(), "compute".to_string());
4147 assert!(validate_node_selector(&selectors).is_ok());
4148
4149 let mut many = BTreeMap::new();
4151 for i in 0..25 {
4152 many.insert(format!("key-{}", i), "value".to_string());
4153 }
4154 assert!(validate_node_selector(&many).is_err());
4155 }
4156
4157 #[test]
4158 fn test_validate_annotations() {
4159 let mut annotations = BTreeMap::new();
4160 annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
4161 assert!(validate_annotations(&annotations).is_ok());
4162
4163 let mut many = BTreeMap::new();
4165 for i in 0..55 {
4166 many.insert(format!("annotation-{}", i), "value".to_string());
4167 }
4168 assert!(validate_annotations(&many).is_err());
4169 }
4170
4171 #[test]
4172 fn test_validate_labels() {
4173 let mut labels = BTreeMap::new();
4174 labels.insert("team".to_string(), "platform".to_string());
4175 assert!(validate_labels(&labels).is_ok());
4176
4177 let mut reserved = BTreeMap::new();
4179 reserved.insert("app.kubernetes.io/custom".to_string(), "value".to_string());
4180 assert!(validate_labels(&reserved).is_err());
4181 }
4182
4183 #[test]
4184 fn test_validate_raw_config() {
4185 let mut config = BTreeMap::new();
4186 config.insert("custom.setting".to_string(), "value".to_string());
4187 assert!(validate_raw_config(&config).is_ok());
4188
4189 let mut forbidden = BTreeMap::new();
4191 forbidden.insert("command".to_string(), "/bin/sh".to_string());
4192 assert!(validate_raw_config(&forbidden).is_err());
4193
4194 let mut many = BTreeMap::new();
4196 for i in 0..55 {
4197 many.insert(format!("config-{}", i), "value".to_string());
4198 }
4199 assert!(validate_raw_config(&many).is_err());
4200 }
4201
4202 #[test]
4203 fn test_validate_int_or_percent() {
4204 assert!(validate_optional_int_or_percent("1").is_ok());
4205 assert!(validate_optional_int_or_percent("25%").is_ok());
4206 assert!(validate_optional_int_or_percent("100%").is_ok());
4207 assert!(validate_optional_int_or_percent("").is_ok()); assert!(validate_optional_int_or_percent("abc").is_err());
4209 assert!(validate_optional_int_or_percent("25%%").is_err());
4210 }
4211
4212 #[test]
4213 fn test_tls_spec_default() {
4214 let tls = TlsSpec::default();
4215 assert!(!tls.enabled);
4216 assert!(tls.cert_secret_name.is_none());
4217 assert!(!tls.mtls_enabled);
4218 }
4219
4220 #[test]
4221 fn test_metrics_spec_default() {
4222 let metrics = MetricsSpec::default();
4223 assert!(metrics.enabled);
4224 assert_eq!(metrics.port, 9090);
4225 }
4226
4227 #[test]
4228 fn test_pdb_spec_default() {
4229 let pdb = PdbSpec::default();
4230 assert!(pdb.enabled);
4231 assert!(pdb.min_available.is_none());
4232 assert_eq!(pdb.max_unavailable, Some("1".to_string()));
4233 }
4234
4235 #[test]
4236 fn test_service_monitor_labels() {
4237 let mut labels = BTreeMap::new();
4238 labels.insert("release".to_string(), "prometheus".to_string());
4239 assert!(validate_service_monitor_labels(&labels).is_ok());
4240
4241 let mut many = BTreeMap::new();
4243 for i in 0..15 {
4244 many.insert(format!("label-{}", i), "value".to_string());
4245 }
4246 assert!(validate_service_monitor_labels(&many).is_err());
4247 }
4248
4249 #[test]
4250 fn test_cluster_condition_time_format() {
4251 let condition = ClusterCondition {
4252 condition_type: "Ready".to_string(),
4253 status: "True".to_string(),
4254 last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4255 reason: Some("AllReplicasReady".to_string()),
4256 message: Some("All replicas are ready".to_string()),
4257 };
4258 assert!(condition.last_transition_time.unwrap().contains('T'));
4259 }
4260
4261 #[test]
4266 fn test_schema_registry_phase_default() {
4267 let phase = SchemaRegistryPhase::default();
4268 assert_eq!(phase, SchemaRegistryPhase::Pending);
4269 }
4270
4271 #[test]
4272 fn test_schema_registry_server_spec_default() {
4273 let spec = SchemaRegistryServerSpec::default();
4274 assert_eq!(spec.port, 8081);
4275 assert_eq!(spec.bind_address, "0.0.0.0");
4276 assert_eq!(spec.timeout_seconds, 30);
4277 assert_eq!(spec.max_request_size, 10_485_760);
4278 assert!(!spec.cors_enabled);
4279 }
4280
4281 #[test]
4282 fn test_schema_registry_storage_spec_default() {
4283 let spec = SchemaRegistryStorageSpec::default();
4284 assert_eq!(spec.mode, "broker");
4285 assert_eq!(spec.topic, "_schemas");
4286 assert_eq!(spec.replication_factor, 3);
4287 assert_eq!(spec.partitions, 1);
4288 assert!(spec.normalize);
4289 }
4290
4291 #[test]
4292 fn test_validate_storage_mode() {
4293 assert!(validate_storage_mode("memory").is_ok());
4294 assert!(validate_storage_mode("broker").is_ok());
4295 assert!(validate_storage_mode("invalid").is_err());
4296 }
4297
4298 #[test]
4299 fn test_schema_compatibility_spec_default() {
4300 let spec = SchemaCompatibilitySpec::default();
4301 assert_eq!(spec.default_level, "BACKWARD");
4302 assert!(spec.allow_overrides);
4303 assert!(spec.subjects.is_empty());
4304 }
4305
4306 #[test]
4307 fn test_validate_compatibility_level() {
4308 assert!(validate_compatibility_level("BACKWARD").is_ok());
4309 assert!(validate_compatibility_level("BACKWARD_TRANSITIVE").is_ok());
4310 assert!(validate_compatibility_level("FORWARD").is_ok());
4311 assert!(validate_compatibility_level("FORWARD_TRANSITIVE").is_ok());
4312 assert!(validate_compatibility_level("FULL").is_ok());
4313 assert!(validate_compatibility_level("FULL_TRANSITIVE").is_ok());
4314 assert!(validate_compatibility_level("NONE").is_ok());
4315 assert!(validate_compatibility_level("invalid").is_err());
4316 assert!(validate_compatibility_level("backward").is_err()); }
4318
4319 #[test]
4320 fn test_schema_format_spec_default() {
4321 let spec = SchemaFormatSpec::default();
4322 assert!(spec.avro);
4323 assert!(spec.json_schema);
4324 assert!(spec.protobuf);
4325 assert!(spec.strict_validation);
4326 }
4327
4328 #[test]
4329 fn test_schema_contexts_spec_default() {
4330 let spec = SchemaContextsSpec::default();
4331 assert!(!spec.enabled);
4332 assert_eq!(spec.max_contexts, 0);
4333 assert!(spec.predefined.is_empty());
4334 }
4335
4336 #[test]
4337 fn test_schema_validation_spec_default() {
4338 let spec = SchemaValidationSpec::default();
4339 assert!(!spec.enabled);
4340 assert_eq!(spec.max_schema_size, 1_048_576);
4341 assert!(spec.rules.is_empty());
4342 }
4343
4344 #[test]
4345 fn test_validate_rule_type() {
4346 assert!(validate_rule_type("regex").is_ok());
4347 assert!(validate_rule_type("field_exists").is_ok());
4348 assert!(validate_rule_type("field_type").is_ok());
4349 assert!(validate_rule_type("naming_convention").is_ok());
4350 assert!(validate_rule_type("documentation").is_ok());
4351 assert!(validate_rule_type("invalid").is_err());
4352 }
4353
4354 #[test]
4355 fn test_validate_validation_level() {
4356 assert!(validate_validation_level("error").is_ok());
4357 assert!(validate_validation_level("warning").is_ok());
4358 assert!(validate_validation_level("info").is_ok());
4359 assert!(validate_validation_level("invalid").is_err());
4360 }
4361
4362 #[test]
4363 fn test_schema_registry_auth_spec_default() {
4364 let spec = SchemaRegistryAuthSpec::default();
4365 assert!(!spec.enabled);
4366 assert!(spec.method.is_none());
4367 assert!(spec.credentials_secret_ref.is_none());
4368 }
4369
4370 #[test]
4371 fn test_validate_auth_method() {
4372 assert!(validate_auth_method("basic").is_ok());
4373 assert!(validate_auth_method("jwt").is_ok());
4374 assert!(validate_auth_method("cedar").is_ok());
4375 assert!(validate_auth_method("").is_ok());
4376 assert!(validate_auth_method("invalid").is_err());
4377 }
4378
4379 #[test]
4380 fn test_jwt_auth_spec_default() {
4381 let spec = JwtAuthSpec::default();
4382 assert!(spec.issuer_url.is_none());
4383 assert!(spec.jwks_url.is_none());
4384 assert!(spec.username_claim.is_empty());
4386 assert!(spec.roles_claim.is_empty());
4387 }
4388
4389 #[test]
4390 fn test_validate_user_role() {
4391 assert!(validate_user_role("admin").is_ok());
4392 assert!(validate_user_role("writer").is_ok());
4393 assert!(validate_user_role("reader").is_ok());
4394 assert!(validate_user_role("invalid").is_err());
4395 }
4396
4397 #[test]
4398 fn test_schema_registry_tls_spec_default() {
4399 let spec = SchemaRegistryTlsSpec::default();
4400 assert!(!spec.enabled);
4401 assert!(spec.cert_secret_name.is_none());
4402 assert!(!spec.mtls_enabled);
4403 assert!(!spec.broker_tls_enabled);
4404 }
4405
4406 #[test]
4407 fn test_schema_registry_metrics_spec_default() {
4408 let spec = SchemaRegistryMetricsSpec::default();
4409 assert!(spec.enabled);
4410 assert_eq!(spec.port, 9090);
4411 assert_eq!(spec.path, "/metrics");
4412 assert!(!spec.service_monitor_enabled);
4413 assert_eq!(spec.scrape_interval, "30s");
4414 }
4415
4416 #[test]
4417 fn test_external_registry_spec_default() {
4418 let spec = ExternalRegistrySpec::default();
4419 assert!(!spec.enabled);
4420 assert!(spec.registry_type.is_none());
4421 assert!(spec.confluent_url.is_none());
4422 assert!(spec.glue_registry_arn.is_none());
4423 assert_eq!(spec.sync_interval_seconds, 0);
4425 }
4426
4427 #[test]
4428 fn test_validate_external_registry_type() {
4429 assert!(validate_external_registry_type("confluent").is_ok());
4430 assert!(validate_external_registry_type("glue").is_ok());
4431 assert!(validate_external_registry_type("").is_ok());
4432 assert!(validate_external_registry_type("invalid").is_err());
4433 }
4434
4435 #[test]
4436 fn test_validate_sync_mode() {
4437 assert!(validate_sync_mode("mirror").is_ok());
4438 assert!(validate_sync_mode("push").is_ok());
4439 assert!(validate_sync_mode("bidirectional").is_ok());
4440 assert!(validate_sync_mode("").is_ok());
4441 assert!(validate_sync_mode("invalid").is_err());
4442 }
4443
4444 #[test]
4445 fn test_schema_registry_spec_get_image_default() {
4446 let spec = RivvenSchemaRegistrySpec {
4447 cluster_ref: ClusterReference {
4448 name: "test".to_string(),
4449 namespace: None,
4450 },
4451 replicas: 1,
4452 version: "0.0.1".to_string(),
4453 image: None,
4454 image_pull_policy: "IfNotPresent".to_string(),
4455 image_pull_secrets: vec![],
4456 resources: None,
4457 server: SchemaRegistryServerSpec::default(),
4458 storage: SchemaRegistryStorageSpec::default(),
4459 compatibility: SchemaCompatibilitySpec::default(),
4460 schemas: SchemaFormatSpec::default(),
4461 contexts: SchemaContextsSpec::default(),
4462 validation: SchemaValidationSpec::default(),
4463 auth: SchemaRegistryAuthSpec::default(),
4464 tls: SchemaRegistryTlsSpec::default(),
4465 metrics: SchemaRegistryMetricsSpec::default(),
4466 external: ExternalRegistrySpec::default(),
4467 pod_annotations: BTreeMap::new(),
4468 pod_labels: BTreeMap::new(),
4469 env: vec![],
4470 node_selector: BTreeMap::new(),
4471 tolerations: vec![],
4472 affinity: None,
4473 service_account: None,
4474 security_context: None,
4475 container_security_context: None,
4476 liveness_probe: ProbeSpec::default(),
4477 readiness_probe: ProbeSpec::default(),
4478 pod_disruption_budget: PdbSpec::default(),
4479 };
4480 assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven-schema:0.0.1");
4481 }
4482
4483 #[test]
4484 fn test_schema_registry_spec_get_image_custom() {
4485 let spec = RivvenSchemaRegistrySpec {
4486 cluster_ref: ClusterReference {
4487 name: "test".to_string(),
4488 namespace: None,
4489 },
4490 replicas: 1,
4491 version: "0.0.1".to_string(),
4492 image: Some("custom/schema-registry:latest".to_string()),
4493 image_pull_policy: "IfNotPresent".to_string(),
4494 image_pull_secrets: vec![],
4495 resources: None,
4496 server: SchemaRegistryServerSpec::default(),
4497 storage: SchemaRegistryStorageSpec::default(),
4498 compatibility: SchemaCompatibilitySpec::default(),
4499 schemas: SchemaFormatSpec::default(),
4500 contexts: SchemaContextsSpec::default(),
4501 validation: SchemaValidationSpec::default(),
4502 auth: SchemaRegistryAuthSpec::default(),
4503 tls: SchemaRegistryTlsSpec::default(),
4504 metrics: SchemaRegistryMetricsSpec::default(),
4505 external: ExternalRegistrySpec::default(),
4506 pod_annotations: BTreeMap::new(),
4507 pod_labels: BTreeMap::new(),
4508 env: vec![],
4509 node_selector: BTreeMap::new(),
4510 tolerations: vec![],
4511 affinity: None,
4512 service_account: None,
4513 security_context: None,
4514 container_security_context: None,
4515 liveness_probe: ProbeSpec::default(),
4516 readiness_probe: ProbeSpec::default(),
4517 pod_disruption_budget: PdbSpec::default(),
4518 };
4519 assert_eq!(spec.get_image(), "custom/schema-registry:latest");
4520 }
4521
4522 #[test]
4523 fn test_schema_registry_spec_get_labels() {
4524 let spec = RivvenSchemaRegistrySpec {
4525 cluster_ref: ClusterReference {
4526 name: "test".to_string(),
4527 namespace: None,
4528 },
4529 replicas: 1,
4530 version: "0.0.1".to_string(),
4531 image: None,
4532 image_pull_policy: "IfNotPresent".to_string(),
4533 image_pull_secrets: vec![],
4534 resources: None,
4535 server: SchemaRegistryServerSpec::default(),
4536 storage: SchemaRegistryStorageSpec::default(),
4537 compatibility: SchemaCompatibilitySpec::default(),
4538 schemas: SchemaFormatSpec::default(),
4539 contexts: SchemaContextsSpec::default(),
4540 validation: SchemaValidationSpec::default(),
4541 auth: SchemaRegistryAuthSpec::default(),
4542 tls: SchemaRegistryTlsSpec::default(),
4543 metrics: SchemaRegistryMetricsSpec::default(),
4544 external: ExternalRegistrySpec::default(),
4545 pod_annotations: BTreeMap::new(),
4546 pod_labels: BTreeMap::new(),
4547 env: vec![],
4548 node_selector: BTreeMap::new(),
4549 tolerations: vec![],
4550 affinity: None,
4551 service_account: None,
4552 security_context: None,
4553 container_security_context: None,
4554 liveness_probe: ProbeSpec::default(),
4555 readiness_probe: ProbeSpec::default(),
4556 pod_disruption_budget: PdbSpec::default(),
4557 };
4558
4559 let labels = spec.get_labels("my-registry");
4560 assert_eq!(
4561 labels.get("app.kubernetes.io/name"),
4562 Some(&"rivven-schema-registry".to_string())
4563 );
4564 assert_eq!(
4565 labels.get("app.kubernetes.io/instance"),
4566 Some(&"my-registry".to_string())
4567 );
4568 assert_eq!(
4569 labels.get("app.kubernetes.io/component"),
4570 Some(&"schema-registry".to_string())
4571 );
4572 }
4573
4574 #[test]
4575 fn test_schema_registry_condition_time_format() {
4576 let condition = SchemaRegistryCondition {
4577 condition_type: "Ready".to_string(),
4578 status: "True".to_string(),
4579 last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4580 reason: Some("AllReplicasReady".to_string()),
4581 message: Some("All replicas are ready".to_string()),
4582 };
4583 assert!(condition.last_transition_time.unwrap().contains('T'));
4584 }
4585
4586 #[test]
4587 fn test_validate_subject_compatibility_map() {
4588 let mut subjects = BTreeMap::new();
4589 subjects.insert("orders-value".to_string(), "BACKWARD".to_string());
4590 subjects.insert("users-value".to_string(), "FULL".to_string());
4591 assert!(validate_subject_compatibility_map(&subjects).is_ok());
4592
4593 let mut invalid = BTreeMap::new();
4595 invalid.insert("test".to_string(), "invalid".to_string());
4596 assert!(validate_subject_compatibility_map(&invalid).is_err());
4597
4598 let mut long_name = BTreeMap::new();
4600 long_name.insert("a".repeat(300), "BACKWARD".to_string());
4601 assert!(validate_subject_compatibility_map(&long_name).is_err());
4602 }
4603
4604 #[test]
4609 fn test_snapshot_cdc_config_spec_serde_defaults() {
4610 let json = r#"{}"#;
4612 let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4613 assert_eq!(config.batch_size, 10_000);
4614 assert_eq!(config.parallel_tables, 4);
4615 assert_eq!(config.query_timeout_secs, 300);
4616 assert_eq!(config.throttle_delay_ms, 0);
4617 assert_eq!(config.max_retries, 3);
4618 assert!(config.include_tables.is_empty());
4619 assert!(config.exclude_tables.is_empty());
4620 }
4621
4622 #[test]
4623 fn test_snapshot_cdc_config_spec_validation() {
4624 let json = r#"{"batchSize": 50}"#;
4625 let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4626 assert!(config.validate().is_err());
4627
4628 let json = r#"{"batchSize": 5000, "parallelTables": 50}"#;
4629 let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4630 assert!(config.validate().is_err());
4631 }
4632
4633 #[test]
4634 fn test_incremental_snapshot_spec_serde_defaults() {
4635 let json = r#"{}"#;
4636 let config: IncrementalSnapshotSpec = serde_json::from_str(json).unwrap();
4637 assert!(!config.enabled);
4638 assert_eq!(config.chunk_size, 1024);
4639 assert!(config.watermark_strategy.is_empty());
4640 assert_eq!(config.max_concurrent_chunks, 1);
4641 }
4642
4643 #[test]
4644 fn test_validate_watermark_strategy() {
4645 assert!(validate_watermark_strategy("insert").is_ok());
4646 assert!(validate_watermark_strategy("update_and_insert").is_ok());
4647 assert!(validate_watermark_strategy("").is_ok());
4648 assert!(validate_watermark_strategy("invalid").is_err());
4649 }
4650
4651 #[test]
4652 fn test_heartbeat_cdc_spec_serde_defaults() {
4653 let json = r#"{}"#;
4654 let config: HeartbeatCdcSpec = serde_json::from_str(json).unwrap();
4655 assert!(!config.enabled);
4656 assert_eq!(config.interval_secs, 10);
4657 assert_eq!(config.max_lag_secs, 300);
4658 assert!(!config.emit_events);
4659 }
4660
4661 #[test]
4662 fn test_deduplication_cdc_spec_serde_defaults() {
4663 let json = r#"{}"#;
4664 let config: DeduplicationCdcSpec = serde_json::from_str(json).unwrap();
4665 assert!(!config.enabled);
4666 assert_eq!(config.bloom_expected_insertions, 100_000);
4667 assert_eq!(config.bloom_fpp, 0.01);
4668 assert_eq!(config.lru_size, 10_000);
4669 assert_eq!(config.window_secs, 3600);
4670 }
4671
4672 #[test]
4673 fn test_transaction_topic_spec_serde_defaults() {
4674 let json = r#"{}"#;
4675 let config: TransactionTopicSpec = serde_json::from_str(json).unwrap();
4676 assert!(!config.enabled);
4677 assert!(config.topic_name.is_none());
4678 assert!(config.include_data_collections);
4679 assert_eq!(config.min_events_threshold, 0);
4680 }
4681
4682 #[test]
4683 fn test_schema_change_topic_spec_serde_defaults() {
4684 let json = r#"{}"#;
4685 let config: SchemaChangeTopicSpec = serde_json::from_str(json).unwrap();
4686 assert!(!config.enabled);
4687 assert!(config.topic_name.is_none());
4688 assert!(config.include_columns);
4689 assert!(config.schemas.is_empty());
4690 }
4691
4692 #[test]
4693 fn test_tombstone_cdc_spec_serde_defaults() {
4694 let json = r#"{}"#;
4695 let config: TombstoneCdcSpec = serde_json::from_str(json).unwrap();
4696 assert!(!config.enabled);
4697 assert!(config.after_delete);
4698 assert!(config.behavior.is_empty());
4699 }
4700
4701 #[test]
4702 fn test_validate_tombstone_behavior() {
4703 assert!(validate_tombstone_behavior("emit_null").is_ok());
4704 assert!(validate_tombstone_behavior("emit_with_key").is_ok());
4705 assert!(validate_tombstone_behavior("").is_ok());
4706 assert!(validate_tombstone_behavior("invalid").is_err());
4707 }
4708
4709 #[test]
4710 fn test_field_encryption_spec_serde_defaults() {
4711 let json = r#"{}"#;
4712 let config: FieldEncryptionSpec = serde_json::from_str(json).unwrap();
4713 assert!(!config.enabled);
4714 assert!(config.key_secret_ref.is_none());
4715 assert!(config.fields.is_empty());
4716 assert_eq!(config.algorithm, "aes-256-gcm");
4717 }
4718
4719 #[test]
4720 fn test_read_only_replica_spec_serde_defaults() {
4721 let json = r#"{}"#;
4722 let config: ReadOnlyReplicaSpec = serde_json::from_str(json).unwrap();
4723 assert!(!config.enabled);
4724 assert_eq!(config.lag_threshold_ms, 5000);
4725 assert!(config.deduplicate);
4726 assert!(config.watermark_source.is_empty());
4727 }
4728
4729 #[test]
4730 fn test_validate_watermark_source() {
4731 assert!(validate_watermark_source("primary").is_ok());
4732 assert!(validate_watermark_source("replica").is_ok());
4733 assert!(validate_watermark_source("").is_ok());
4734 assert!(validate_watermark_source("invalid").is_err());
4735 }
4736
4737 #[test]
4738 fn test_event_router_spec_serde_defaults() {
4739 let json = r#"{}"#;
4740 let config: EventRouterSpec = serde_json::from_str(json).unwrap();
4741 assert!(!config.enabled);
4742 assert!(config.default_destination.is_none());
4743 assert!(config.dead_letter_queue.is_none());
4744 assert!(!config.drop_unroutable);
4745 assert!(config.rules.is_empty());
4746 }
4747
4748 #[test]
4749 fn test_validate_route_condition_type() {
4750 assert!(validate_route_condition_type("always").is_ok());
4751 assert!(validate_route_condition_type("table").is_ok());
4752 assert!(validate_route_condition_type("table_pattern").is_ok());
4753 assert!(validate_route_condition_type("schema").is_ok());
4754 assert!(validate_route_condition_type("operation").is_ok());
4755 assert!(validate_route_condition_type("field_equals").is_ok());
4756 assert!(validate_route_condition_type("field_exists").is_ok());
4757 assert!(validate_route_condition_type("invalid").is_err());
4758 }
4759
4760 #[test]
4761 fn test_partitioner_spec_serde_defaults() {
4762 let json = r#"{}"#;
4763 let config: PartitionerSpec = serde_json::from_str(json).unwrap();
4764 assert!(!config.enabled);
4765 assert_eq!(config.num_partitions, 16);
4766 assert_eq!(config.strategy, "key_hash");
4767 assert!(config.key_columns.is_empty());
4768 }
4769
4770 #[test]
4771 fn test_validate_partition_strategy() {
4772 assert!(validate_partition_strategy("round_robin").is_ok());
4773 assert!(validate_partition_strategy("key_hash").is_ok());
4774 assert!(validate_partition_strategy("table_hash").is_ok());
4775 assert!(validate_partition_strategy("full_table_hash").is_ok());
4776 assert!(validate_partition_strategy("sticky").is_ok());
4777 assert!(validate_partition_strategy("invalid").is_err());
4778 }
4779
4780 #[test]
4781 fn test_validate_smt_transform_type() {
4782 assert!(validate_smt_transform_type("extract_new_record_state").is_ok());
4783 assert!(validate_smt_transform_type("mask_field").is_ok());
4784 assert!(validate_smt_transform_type("filter").is_ok());
4785 assert!(validate_smt_transform_type("flatten").is_ok());
4786 assert!(validate_smt_transform_type("cast").is_ok());
4787 assert!(validate_smt_transform_type("regex_router").is_ok());
4788 assert!(validate_smt_transform_type("content_router").is_ok());
4789 assert!(validate_smt_transform_type("invalid").is_err());
4790 }
4791
4792 #[test]
4793 fn test_parallel_cdc_spec_serde_defaults() {
4794 let json = r#"{}"#;
4795 let config: ParallelCdcSpec = serde_json::from_str(json).unwrap();
4796 assert!(!config.enabled);
4797 assert_eq!(config.concurrency, 4);
4798 assert_eq!(config.per_table_buffer, 1000);
4799 assert_eq!(config.output_buffer, 10_000);
4800 assert!(config.work_stealing);
4801 assert!(config.per_table_rate_limit.is_none());
4802 assert_eq!(config.shutdown_timeout_secs, 30);
4803 }
4804
4805 #[test]
4806 fn test_outbox_spec_serde_defaults() {
4807 let json = r#"{}"#;
4808 let config: OutboxSpec = serde_json::from_str(json).unwrap();
4809 assert!(!config.enabled);
4810 assert_eq!(config.table_name, "outbox");
4811 assert_eq!(config.poll_interval_ms, 100);
4812 assert_eq!(config.batch_size, 100);
4813 assert_eq!(config.max_retries, 3);
4814 assert_eq!(config.delivery_timeout_secs, 30);
4815 assert!(config.ordered_delivery);
4816 assert_eq!(config.retention_secs, 86400);
4817 assert_eq!(config.max_concurrency, 10);
4818 }
4819
4820 #[test]
4821 fn test_health_monitor_spec_serde_defaults() {
4822 let json = r#"{}"#;
4823 let config: HealthMonitorSpec = serde_json::from_str(json).unwrap();
4824 assert!(!config.enabled);
4825 assert_eq!(config.check_interval_secs, 10);
4826 assert_eq!(config.max_lag_ms, 30_000);
4827 assert_eq!(config.failure_threshold, 3);
4828 assert_eq!(config.success_threshold, 2);
4829 assert_eq!(config.check_timeout_secs, 5);
4830 assert!(config.auto_recovery);
4831 assert_eq!(config.recovery_delay_secs, 1);
4832 assert_eq!(config.max_recovery_delay_secs, 60);
4833 }
4834
4835 #[test]
4836 fn test_signal_table_spec_serde_defaults() {
4837 let json = r#"{}"#;
4838 let config: SignalTableSpec = serde_json::from_str(json).unwrap();
4839 assert!(!config.enabled);
4840 assert!(config.data_collection.is_none());
4841 assert!(config.topic.is_none());
4842 assert!(config.enabled_channels.is_empty());
4843 assert_eq!(config.poll_interval_ms, 1000);
4844 }
4845}