1use k8s_openapi::api::core::v1::ResourceRequirements;
7use kube::CustomResource;
8use regex::Regex;
9use schemars::JsonSchema;
10use serde::{Deserialize, Serialize};
11use std::collections::BTreeMap;
12use std::sync::LazyLock;
13use validator::{Validate, ValidationError};
14
15static QUANTITY_REGEX: LazyLock<Regex> =
17 LazyLock::new(|| Regex::new(r"^[0-9]+(\.[0-9]+)?(Ki|Mi|Gi|Ti|Pi|Ei|k|M|G|T|P|E)?$").unwrap());
18
19static NAME_REGEX: LazyLock<Regex> =
21 LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
22
23fn validate_quantity(value: &str) -> Result<(), ValidationError> {
25 if QUANTITY_REGEX.is_match(value) {
26 Ok(())
27 } else {
28 Err(ValidationError::new("invalid_quantity")
29 .with_message(format!("'{}' is not a valid Kubernetes quantity", value).into()))
30 }
31}
32
33fn validate_image(value: &str) -> Result<(), ValidationError> {
35 if value.is_empty() {
36 return Ok(()); }
38 if value.len() > 255 {
39 return Err(ValidationError::new("image_too_long")
40 .with_message("image reference exceeds 255 characters".into()));
41 }
42 if value.contains("..") || value.starts_with('/') || value.starts_with('-') {
44 return Err(ValidationError::new("invalid_image")
45 .with_message(format!("'{}' is not a valid container image", value).into()));
46 }
47 Ok(())
48}
49
50fn validate_k8s_name(value: &str) -> Result<(), ValidationError> {
52 if value.is_empty() {
53 return Ok(()); }
55 if value.len() > 63 {
56 return Err(
57 ValidationError::new("name_too_long").with_message("name exceeds 63 characters".into())
58 );
59 }
60 if !NAME_REGEX.is_match(value) {
61 return Err(ValidationError::new("invalid_name").with_message(
62 format!("'{}' is not a valid Kubernetes name (RFC 1123)", value).into(),
63 ));
64 }
65 Ok(())
66}
67
68static ENV_NAME_REGEX: LazyLock<Regex> =
71 LazyLock::new(|| Regex::new(r"^[a-zA-Z_][a-zA-Z0-9_]*$").unwrap());
72
73static SHELL_METACHAR_REGEX: LazyLock<Regex> =
75 LazyLock::new(|| Regex::new(r"[`$;|&<>()\\!]").unwrap());
76
77const MAX_ENV_VALUE_LEN: usize = 32_768;
79
80fn validate_env_vars(vars: &[k8s_openapi::api::core::v1::EnvVar]) -> Result<(), ValidationError> {
90 const MAX_ENV_VARS: usize = 100;
92 if vars.len() > MAX_ENV_VARS {
93 return Err(ValidationError::new("too_many_env_vars").with_message(
94 format!("maximum {} environment variables allowed", MAX_ENV_VARS).into(),
95 ));
96 }
97 for var in vars {
98 if var.name.is_empty() || var.name.len() > 256 {
100 return Err(ValidationError::new("invalid_env_name")
101 .with_message("environment variable name must be 1-256 characters".into()));
102 }
103
104 if !ENV_NAME_REGEX.is_match(&var.name) {
106 return Err(
107 ValidationError::new("invalid_env_name_format").with_message(
108 format!(
109 "environment variable name '{}' is not POSIX-compliant \
110 (must match [a-zA-Z_][a-zA-Z0-9_]*)",
111 var.name
112 )
113 .into(),
114 ),
115 );
116 }
117
118 if let Some(ref value) = var.value {
120 if value.len() > MAX_ENV_VALUE_LEN {
121 return Err(ValidationError::new("env_value_too_long").with_message(
122 format!(
123 "environment variable '{}' value exceeds {} byte limit",
124 var.name, MAX_ENV_VALUE_LEN
125 )
126 .into(),
127 ));
128 }
129 if SHELL_METACHAR_REGEX.is_match(value) {
130 return Err(
131 ValidationError::new("env_value_shell_metachar").with_message(
132 format!(
133 "environment variable '{}' value contains shell metacharacters \
134 (backtick, $, ;, |, &, <, >, (, ), \\, ! are not allowed)",
135 var.name
136 )
137 .into(),
138 ),
139 );
140 }
141 }
142
143 let forbidden_names = [
145 "LD_PRELOAD",
146 "LD_LIBRARY_PATH",
147 "DYLD_INSERT_LIBRARIES",
148 "DYLD_LIBRARY_PATH",
149 ];
150 let forbidden_prefixes = ["LD_AUDIT"];
151 let has_value = var.value.is_some() || var.value_from.is_some();
153 for name in forbidden_names {
154 if var.name == name && has_value {
155 return Err(ValidationError::new("forbidden_env_var").with_message(
156 format!(
157 "environment variable '{}' is not allowed for security \
158 (blocked for both direct values and value_from references)",
159 var.name
160 )
161 .into(),
162 ));
163 }
164 }
165 for prefix in forbidden_prefixes {
166 if var.name.starts_with(prefix) && has_value {
167 return Err(ValidationError::new("forbidden_env_var").with_message(
168 format!(
169 "environment variable '{}' is not allowed for security \
170 (blocked for both direct values and value_from references)",
171 var.name
172 )
173 .into(),
174 ));
175 }
176 }
177 }
178 Ok(())
179}
180
181#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
187#[kube(
188 group = "rivven.hupe1980.github.io",
189 version = "v1alpha1",
190 kind = "RivvenCluster",
191 plural = "rivvenclusters",
192 shortname = "rc",
193 namespaced,
194 status = "RivvenClusterStatus",
195 printcolumn = r#"{"name":"Replicas", "type":"integer", "jsonPath":".spec.replicas"}"#,
196 printcolumn = r#"{"name":"Ready", "type":"integer", "jsonPath":".status.readyReplicas"}"#,
197 printcolumn = r#"{"name":"Phase", "type":"string", "jsonPath":".status.phase"}"#,
198 printcolumn = r#"{"name":"Age", "type":"date", "jsonPath":".metadata.creationTimestamp"}"#
199)]
200#[serde(rename_all = "camelCase")]
201pub struct RivvenClusterSpec {
202 #[serde(default = "default_replicas")]
204 #[validate(range(min = 1, max = 100, message = "replicas must be between 1 and 100"))]
205 pub replicas: i32,
206
207 #[serde(default = "default_version")]
209 #[validate(length(min = 1, max = 64, message = "version must be 1-64 characters"))]
210 pub version: String,
211
212 #[serde(default)]
215 #[validate(custom(function = "validate_optional_image"))]
216 pub image: Option<String>,
217
218 #[serde(default = "default_image_pull_policy")]
220 #[validate(custom(function = "validate_pull_policy"))]
221 pub image_pull_policy: String,
222
223 #[serde(default)]
225 #[validate(length(max = 10, message = "maximum 10 image pull secrets allowed"))]
226 pub image_pull_secrets: Vec<String>,
227
228 #[serde(default)]
230 #[validate(nested)]
231 pub storage: StorageSpec,
232
233 #[serde(default)]
235 #[schemars(skip)]
236 pub resources: Option<ResourceRequirements>,
237
238 #[serde(default)]
240 #[validate(nested)]
241 pub config: BrokerConfig,
242
243 #[serde(default)]
245 #[validate(nested)]
246 pub tls: TlsSpec,
247
248 #[serde(default)]
250 #[validate(nested)]
251 pub metrics: MetricsSpec,
252
253 #[serde(default)]
255 #[schemars(skip)]
256 pub affinity: Option<k8s_openapi::api::core::v1::Affinity>,
257
258 #[serde(default)]
260 #[validate(custom(function = "validate_node_selector"))]
261 pub node_selector: BTreeMap<String, String>,
262
263 #[serde(default)]
265 #[schemars(skip)]
266 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
267
268 #[serde(default)]
270 #[validate(nested)]
271 pub pod_disruption_budget: PdbSpec,
272
273 #[serde(default)]
275 #[validate(custom(function = "validate_optional_k8s_name"))]
276 pub service_account: Option<String>,
277
278 #[serde(default)]
280 #[validate(custom(function = "validate_annotations"))]
281 pub pod_annotations: BTreeMap<String, String>,
282
283 #[serde(default)]
285 #[validate(custom(function = "validate_labels"))]
286 pub pod_labels: BTreeMap<String, String>,
287
288 #[serde(default)]
290 #[schemars(skip)]
291 #[validate(custom(function = "validate_env_vars"))]
292 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
293
294 #[serde(default)]
296 #[validate(nested)]
297 pub liveness_probe: ProbeSpec,
298
299 #[serde(default)]
301 #[validate(nested)]
302 pub readiness_probe: ProbeSpec,
303
304 #[serde(default)]
306 #[schemars(skip)]
307 pub security_context: Option<k8s_openapi::api::core::v1::PodSecurityContext>,
308
309 #[serde(default)]
311 #[schemars(skip)]
312 pub container_security_context: Option<k8s_openapi::api::core::v1::SecurityContext>,
313}
314
315fn validate_optional_image(image: &str) -> Result<(), ValidationError> {
317 validate_image(image)
318}
319
320fn validate_pull_policy(policy: &str) -> Result<(), ValidationError> {
322 match policy {
323 "Always" | "IfNotPresent" | "Never" => Ok(()),
324 _ => Err(ValidationError::new("invalid_pull_policy")
325 .with_message("imagePullPolicy must be Always, IfNotPresent, or Never".into())),
326 }
327}
328
329fn validate_node_selector(selectors: &BTreeMap<String, String>) -> Result<(), ValidationError> {
331 if selectors.len() > 20 {
332 return Err(ValidationError::new("too_many_selectors")
333 .with_message("maximum 20 node selectors allowed".into()));
334 }
335 for (key, value) in selectors {
336 if key.len() > 253 || value.len() > 63 {
337 return Err(ValidationError::new("selector_too_long")
338 .with_message("selector key max 253 chars, value max 63 chars".into()));
339 }
340 }
341 Ok(())
342}
343
344fn validate_optional_k8s_name(name: &str) -> Result<(), ValidationError> {
346 if name.is_empty() {
347 return Ok(()); }
349 validate_k8s_name(name)
350}
351
352fn validate_annotations(annotations: &BTreeMap<String, String>) -> Result<(), ValidationError> {
354 if annotations.len() > 50 {
355 return Err(ValidationError::new("too_many_annotations")
356 .with_message("maximum 50 annotations allowed".into()));
357 }
358 for (key, value) in annotations {
359 if key.len() > 253 {
361 return Err(ValidationError::new("annotation_key_too_long")
362 .with_message(format!("annotation key '{}' exceeds 253 characters", key).into()));
363 }
364 if value.len() > 262144 {
366 return Err(ValidationError::new("annotation_value_too_long")
367 .with_message(format!("annotation '{}' value exceeds 256KB", key).into()));
368 }
369 }
370 Ok(())
371}
372
373fn validate_labels(labels: &BTreeMap<String, String>) -> Result<(), ValidationError> {
375 if labels.len() > 20 {
376 return Err(ValidationError::new("too_many_labels")
377 .with_message("maximum 20 labels allowed".into()));
378 }
379 for (key, value) in labels {
380 if key.len() > 253 || value.len() > 63 {
381 return Err(ValidationError::new("label_too_long")
382 .with_message("label key max 253 chars, value max 63 chars".into()));
383 }
384 if key.starts_with("app.kubernetes.io/") {
386 return Err(ValidationError::new("reserved_label").with_message(
387 format!("label '{}' uses reserved prefix app.kubernetes.io/", key).into(),
388 ));
389 }
390 }
391 Ok(())
392}
393
394#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
396#[serde(rename_all = "camelCase")]
397pub struct StorageSpec {
398 #[serde(default = "default_storage_size")]
400 #[validate(custom(function = "validate_quantity"))]
401 pub size: String,
402
403 #[serde(default)]
405 #[validate(custom(function = "validate_optional_k8s_name"))]
406 pub storage_class_name: Option<String>,
407
408 #[serde(default = "default_access_modes")]
410 #[validate(length(min = 1, max = 3, message = "access modes must have 1-3 entries"))]
411 #[validate(custom(function = "validate_access_modes"))]
412 pub access_modes: Vec<String>,
413}
414
415fn validate_access_modes(modes: &[String]) -> Result<(), ValidationError> {
417 let valid_modes = [
418 "ReadWriteOnce",
419 "ReadOnlyMany",
420 "ReadWriteMany",
421 "ReadWriteOncePod",
422 ];
423 for mode in modes {
424 if !valid_modes.contains(&mode.as_str()) {
425 return Err(ValidationError::new("invalid_access_mode")
426 .with_message(format!("'{}' is not a valid access mode", mode).into()));
427 }
428 }
429 Ok(())
430}
431
432impl Default for StorageSpec {
433 fn default() -> Self {
434 Self {
435 size: default_storage_size(),
436 storage_class_name: None,
437 access_modes: default_access_modes(),
438 }
439 }
440}
441
442#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
444#[serde(rename_all = "camelCase")]
445pub struct BrokerConfig {
446 #[serde(default = "default_partitions")]
448 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
449 pub default_partitions: i32,
450
451 #[serde(default = "default_replication_factor")]
453 #[validate(range(
454 min = 1,
455 max = 10,
456 message = "replication factor must be between 1 and 10"
457 ))]
458 pub default_replication_factor: i32,
459
460 #[serde(default = "default_log_retention_hours")]
462 #[validate(range(
463 min = 1,
464 max = 8760,
465 message = "retention hours must be between 1 and 8760"
466 ))]
467 pub log_retention_hours: i32,
468
469 #[serde(default = "default_log_segment_bytes")]
471 #[validate(custom(function = "validate_segment_size"))]
472 pub log_segment_bytes: i64,
473
474 #[serde(default = "default_max_message_bytes")]
476 #[validate(custom(function = "validate_message_size"))]
477 pub max_message_bytes: i64,
478
479 #[serde(default = "default_true")]
481 pub auto_create_topics: bool,
482
483 #[serde(default = "default_true")]
485 pub compression_enabled: bool,
486
487 #[serde(default = "default_compression")]
489 #[validate(custom(function = "validate_compression_type"))]
490 pub compression_type: String,
491
492 #[serde(default = "default_election_timeout")]
494 #[validate(range(
495 min = 100,
496 max = 60000,
497 message = "election timeout must be between 100ms and 60s"
498 ))]
499 pub raft_election_timeout_ms: i32,
500
501 #[serde(default = "default_heartbeat_interval")]
503 #[validate(range(
504 min = 10,
505 max = 10000,
506 message = "heartbeat interval must be between 10ms and 10s"
507 ))]
508 pub raft_heartbeat_interval_ms: i32,
509
510 #[serde(default)]
512 #[validate(custom(function = "validate_raw_config"))]
513 pub raw: BTreeMap<String, String>,
514}
515
516fn validate_compression_type(compression: &str) -> Result<(), ValidationError> {
518 match compression {
519 "lz4" | "zstd" | "none" | "snappy" | "gzip" => Ok(()),
520 _ => Err(ValidationError::new("invalid_compression")
521 .with_message("compression must be one of: lz4, zstd, none, snappy, gzip".into())),
522 }
523}
524
525fn validate_segment_size(size: i64) -> Result<(), ValidationError> {
527 const MIN_SEGMENT_SIZE: i64 = 1_048_576; const MAX_SEGMENT_SIZE: i64 = 10_737_418_240; if !(MIN_SEGMENT_SIZE..=MAX_SEGMENT_SIZE).contains(&size) {
530 return Err(ValidationError::new("invalid_segment_size")
531 .with_message("segment size must be between 1MB and 10GB".into()));
532 }
533 Ok(())
534}
535
536fn validate_message_size(size: i64) -> Result<(), ValidationError> {
538 const MIN_MESSAGE_SIZE: i64 = 1_024; const MAX_MESSAGE_SIZE: i64 = 104_857_600; if !(MIN_MESSAGE_SIZE..=MAX_MESSAGE_SIZE).contains(&size) {
541 return Err(ValidationError::new("invalid_message_size")
542 .with_message("max message size must be between 1KB and 100MB".into()));
543 }
544 Ok(())
545}
546
547fn validate_raw_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
549 if config.len() > 50 {
550 return Err(ValidationError::new("too_many_raw_configs")
551 .with_message("maximum 50 raw configuration entries allowed".into()));
552 }
553 for (key, value) in config {
554 if key.len() > 128 || value.len() > 4096 {
555 return Err(ValidationError::new("raw_config_too_long")
556 .with_message("raw config key max 128 chars, value max 4096 chars".into()));
557 }
558 let forbidden_keys = ["command", "args", "image", "securityContext", "volumes"];
560 if forbidden_keys.contains(&key.as_str()) {
561 return Err(ValidationError::new("forbidden_raw_config")
562 .with_message(format!("raw config key '{}' is not allowed", key).into()));
563 }
564 }
565 Ok(())
566}
567
568impl Default for BrokerConfig {
569 fn default() -> Self {
570 Self {
571 default_partitions: default_partitions(),
572 default_replication_factor: default_replication_factor(),
573 log_retention_hours: default_log_retention_hours(),
574 log_segment_bytes: default_log_segment_bytes(),
575 max_message_bytes: default_max_message_bytes(),
576 auto_create_topics: default_true(),
577 compression_enabled: default_true(),
578 compression_type: default_compression(),
579 raft_election_timeout_ms: default_election_timeout(),
580 raft_heartbeat_interval_ms: default_heartbeat_interval(),
581 raw: BTreeMap::new(),
582 }
583 }
584}
585
586#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
588#[serde(rename_all = "camelCase")]
589#[validate(schema(function = "validate_tls_spec"))]
590pub struct TlsSpec {
591 #[serde(default)]
593 pub enabled: bool,
594
595 #[serde(default)]
597 #[validate(custom(function = "validate_optional_k8s_name"))]
598 pub cert_secret_name: Option<String>,
599
600 #[serde(default)]
602 pub mtls_enabled: bool,
603
604 #[serde(default)]
606 #[validate(custom(function = "validate_optional_k8s_name"))]
607 pub ca_secret_name: Option<String>,
608}
609
610fn validate_tls_spec(spec: &TlsSpec) -> Result<(), validator::ValidationError> {
616 if spec.enabled && spec.cert_secret_name.is_none() {
617 return Err(validator::ValidationError::new("tls_cert_required"));
618 }
619 if spec.mtls_enabled && !spec.enabled {
620 return Err(validator::ValidationError::new("mtls_requires_tls"));
621 }
622 if spec.mtls_enabled && spec.ca_secret_name.is_none() {
623 return Err(validator::ValidationError::new("mtls_ca_required"));
624 }
625 Ok(())
626}
627
628#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
630#[serde(rename_all = "camelCase")]
631pub struct MetricsSpec {
632 #[serde(default = "default_true")]
634 pub enabled: bool,
635
636 #[serde(default = "default_metrics_port")]
638 #[validate(range(
639 min = 1024,
640 max = 65535,
641 message = "metrics port must be between 1024 and 65535"
642 ))]
643 pub port: i32,
644
645 #[serde(default)]
647 #[validate(nested)]
648 pub service_monitor: ServiceMonitorSpec,
649}
650
651impl Default for MetricsSpec {
652 fn default() -> Self {
653 Self {
654 enabled: true,
655 port: 9090,
656 service_monitor: ServiceMonitorSpec::default(),
657 }
658 }
659}
660
661#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
663#[serde(rename_all = "camelCase")]
664pub struct ServiceMonitorSpec {
665 #[serde(default)]
667 pub enabled: bool,
668
669 #[serde(default)]
671 #[validate(custom(function = "validate_optional_k8s_name"))]
672 pub namespace: Option<String>,
673
674 #[serde(default = "default_scrape_interval")]
676 #[validate(custom(function = "validate_duration"))]
677 pub interval: String,
678
679 #[serde(default)]
681 #[validate(custom(function = "validate_service_monitor_labels"))]
682 pub labels: BTreeMap<String, String>,
683}
684
685fn validate_duration(duration: &str) -> Result<(), ValidationError> {
687 static DURATION_REGEX: LazyLock<Regex> =
688 LazyLock::new(|| Regex::new(r"^([0-9]+[smh])+$").unwrap());
689 if !DURATION_REGEX.is_match(duration) {
690 return Err(ValidationError::new("invalid_duration").with_message(
691 format!("'{}' is not a valid duration (e.g., 30s, 1m)", duration).into(),
692 ));
693 }
694 Ok(())
695}
696
697fn validate_service_monitor_labels(
699 labels: &BTreeMap<String, String>,
700) -> Result<(), ValidationError> {
701 if labels.len() > 10 {
702 return Err(ValidationError::new("too_many_labels")
703 .with_message("maximum 10 ServiceMonitor labels allowed".into()));
704 }
705 Ok(())
706}
707
708#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
710#[serde(rename_all = "camelCase")]
711pub struct PdbSpec {
712 #[serde(default = "default_true")]
714 pub enabled: bool,
715
716 #[serde(default)]
719 #[validate(custom(function = "validate_optional_int_or_percent"))]
720 pub min_available: Option<String>,
721
722 #[serde(default = "default_max_unavailable")]
725 #[validate(custom(function = "validate_optional_int_or_percent"))]
726 pub max_unavailable: Option<String>,
727}
728
729fn validate_optional_int_or_percent(value: &str) -> Result<(), ValidationError> {
731 if value.is_empty() {
732 return Ok(());
733 }
734 static INT_OR_PERCENT_REGEX: LazyLock<Regex> =
736 LazyLock::new(|| Regex::new(r"^([0-9]+|[0-9]+%)$").unwrap());
737 if !INT_OR_PERCENT_REGEX.is_match(value) {
738 return Err(ValidationError::new("invalid_int_or_percent").with_message(
739 format!(
740 "'{}' must be an integer or percentage (e.g., '1' or '25%')",
741 value
742 )
743 .into(),
744 ));
745 }
746 Ok(())
747}
748
749impl Default for PdbSpec {
750 fn default() -> Self {
751 Self {
752 enabled: true,
753 min_available: None,
754 max_unavailable: Some("1".to_string()),
755 }
756 }
757}
758
759#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
761#[serde(rename_all = "camelCase")]
762pub struct ProbeSpec {
763 #[serde(default = "default_true")]
765 pub enabled: bool,
766
767 #[serde(default = "default_initial_delay")]
769 #[validate(range(min = 0, max = 3600, message = "initial delay must be 0-3600 seconds"))]
770 pub initial_delay_seconds: i32,
771
772 #[serde(default = "default_period")]
774 #[validate(range(min = 1, max = 300, message = "period must be 1-300 seconds"))]
775 pub period_seconds: i32,
776
777 #[serde(default = "default_timeout")]
779 #[validate(range(min = 1, max = 60, message = "timeout must be 1-60 seconds"))]
780 pub timeout_seconds: i32,
781
782 #[serde(default = "default_one")]
784 #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
785 pub success_threshold: i32,
786
787 #[serde(default = "default_three")]
789 #[validate(range(min = 1, max = 30, message = "failure threshold must be 1-30"))]
790 pub failure_threshold: i32,
791}
792
793impl Default for ProbeSpec {
794 fn default() -> Self {
795 Self {
796 enabled: true,
797 initial_delay_seconds: 30,
798 period_seconds: 10,
799 timeout_seconds: 5,
800 success_threshold: 1,
801 failure_threshold: 3,
802 }
803 }
804}
805
806#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
808#[serde(rename_all = "camelCase")]
809pub struct RivvenClusterStatus {
810 pub phase: ClusterPhase,
812
813 pub replicas: i32,
815
816 pub ready_replicas: i32,
818
819 pub updated_replicas: i32,
821
822 pub observed_generation: i64,
824
825 #[serde(default)]
827 pub conditions: Vec<ClusterCondition>,
828
829 #[serde(default)]
831 pub broker_endpoints: Vec<String>,
832
833 pub leader: Option<String>,
835
836 pub last_updated: Option<String>,
838
839 pub message: Option<String>,
841}
842
843#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
845pub enum ClusterPhase {
846 #[default]
848 Pending,
849 Provisioning,
851 Running,
853 Updating,
855 Degraded,
857 Failed,
859 Terminating,
861}
862
863#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
865#[serde(rename_all = "camelCase")]
866pub struct ClusterCondition {
867 #[serde(rename = "type")]
869 pub condition_type: String,
870
871 pub status: String,
873
874 pub reason: Option<String>,
876
877 pub message: Option<String>,
879
880 pub last_transition_time: Option<String>,
882}
883
884fn default_replicas() -> i32 {
886 3
887}
888
889fn default_version() -> String {
890 "0.0.1".to_string()
891}
892
893fn default_image_pull_policy() -> String {
894 "IfNotPresent".to_string()
895}
896
897fn default_storage_size() -> String {
898 "10Gi".to_string()
899}
900
901fn default_access_modes() -> Vec<String> {
902 vec!["ReadWriteOnce".to_string()]
903}
904
905fn default_partitions() -> i32 {
906 3
907}
908
909fn default_replication_factor() -> i32 {
910 2
911}
912
913fn default_log_retention_hours() -> i32 {
914 168 }
916
917fn default_log_segment_bytes() -> i64 {
918 1073741824 }
920
921fn default_max_message_bytes() -> i64 {
922 1048576 }
924
925fn default_compression() -> String {
926 "lz4".to_string()
927}
928
929fn default_election_timeout() -> i32 {
930 1000
931}
932
933fn default_heartbeat_interval() -> i32 {
934 100
935}
936
937fn default_metrics_port() -> i32 {
938 9090
939}
940
941fn default_scrape_interval() -> String {
942 "30s".to_string()
943}
944
945fn default_max_unavailable() -> Option<String> {
946 Some("1".to_string())
947}
948
949fn default_initial_delay() -> i32 {
950 30
951}
952
953fn default_period() -> i32 {
954 10
955}
956
957fn default_timeout() -> i32 {
958 5
959}
960
961fn default_one() -> i32 {
962 1
963}
964
965fn default_three() -> i32 {
966 3
967}
968
969fn default_true() -> bool {
970 true
971}
972
973impl RivvenClusterSpec {
974 pub fn get_image(&self) -> String {
976 if let Some(ref image) = self.image {
977 image.clone()
978 } else {
979 format!("ghcr.io/hupe1980/rivven:{}", self.version)
980 }
981 }
982
983 pub fn get_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
985 let mut labels = BTreeMap::new();
986 labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
987 labels.insert(
988 "app.kubernetes.io/instance".to_string(),
989 cluster_name.to_string(),
990 );
991 labels.insert(
992 "app.kubernetes.io/component".to_string(),
993 "broker".to_string(),
994 );
995 labels.insert(
996 "app.kubernetes.io/managed-by".to_string(),
997 "rivven-operator".to_string(),
998 );
999 labels.insert(
1000 "app.kubernetes.io/version".to_string(),
1001 self.version.clone(),
1002 );
1003 labels
1004 }
1005
1006 pub fn get_selector_labels(&self, cluster_name: &str) -> BTreeMap<String, String> {
1008 let mut labels = BTreeMap::new();
1009 labels.insert("app.kubernetes.io/name".to_string(), "rivven".to_string());
1010 labels.insert(
1011 "app.kubernetes.io/instance".to_string(),
1012 cluster_name.to_string(),
1013 );
1014 labels
1015 }
1016}
1017
1018#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1028#[kube(
1029 group = "rivven.hupe1980.github.io",
1030 version = "v1alpha1",
1031 kind = "RivvenConnect",
1032 plural = "rivvenconnects",
1033 shortname = "rcon",
1034 namespaced,
1035 status = "RivvenConnectStatus",
1036 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1037 printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
1038 printcolumn = r#"{"name":"Sources","type":"integer","jsonPath":".status.sourcesRunning"}"#,
1039 printcolumn = r#"{"name":"Sinks","type":"integer","jsonPath":".status.sinksRunning"}"#,
1040 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1041 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1042)]
1043#[serde(rename_all = "camelCase")]
1044pub struct RivvenConnectSpec {
1045 #[validate(nested)]
1047 pub cluster_ref: ClusterReference,
1048
1049 #[serde(default = "default_connect_replicas")]
1051 #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
1052 pub replicas: i32,
1053
1054 #[serde(default = "default_version")]
1056 pub version: String,
1057
1058 #[serde(default)]
1060 #[validate(custom(function = "validate_optional_image"))]
1061 pub image: Option<String>,
1062
1063 #[serde(default = "default_image_pull_policy")]
1065 #[validate(custom(function = "validate_pull_policy"))]
1066 pub image_pull_policy: String,
1067
1068 #[serde(default)]
1070 pub image_pull_secrets: Vec<String>,
1071
1072 #[serde(default)]
1074 pub resources: Option<serde_json::Value>,
1075
1076 #[serde(default)]
1078 #[validate(nested)]
1079 pub config: ConnectConfigSpec,
1080
1081 #[serde(default)]
1083 #[validate(length(max = 50, message = "maximum 50 source connectors allowed"))]
1084 pub sources: Vec<SourceConnectorSpec>,
1085
1086 #[serde(default)]
1088 #[validate(length(max = 50, message = "maximum 50 sink connectors allowed"))]
1089 pub sinks: Vec<SinkConnectorSpec>,
1090
1091 #[serde(default)]
1093 #[validate(nested)]
1094 pub settings: GlobalConnectSettings,
1095
1096 #[serde(default)]
1098 #[validate(nested)]
1099 pub tls: ConnectTlsSpec,
1100
1101 #[serde(default)]
1103 #[validate(custom(function = "validate_annotations"))]
1104 pub pod_annotations: BTreeMap<String, String>,
1105
1106 #[serde(default)]
1108 #[validate(custom(function = "validate_labels"))]
1109 pub pod_labels: BTreeMap<String, String>,
1110
1111 #[serde(default)]
1113 #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
1114 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
1115
1116 #[serde(default)]
1118 pub node_selector: BTreeMap<String, String>,
1119
1120 #[serde(default)]
1122 #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
1123 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
1124
1125 #[serde(default)]
1127 pub affinity: Option<serde_json::Value>,
1128
1129 #[serde(default)]
1131 #[validate(custom(function = "validate_optional_k8s_name"))]
1132 pub service_account: Option<String>,
1133
1134 #[serde(default)]
1136 pub security_context: Option<serde_json::Value>,
1137
1138 #[serde(default)]
1140 pub container_security_context: Option<serde_json::Value>,
1141}
1142
1143#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1145#[serde(rename_all = "camelCase")]
1146pub struct ClusterReference {
1147 #[validate(length(min = 1, max = 63, message = "cluster name must be 1-63 characters"))]
1149 #[validate(custom(function = "validate_k8s_name"))]
1150 pub name: String,
1151
1152 #[serde(default)]
1154 #[validate(custom(function = "validate_optional_k8s_name"))]
1155 pub namespace: Option<String>,
1156}
1157
1158#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1193#[kube(
1194 group = "rivven.hupe1980.github.io",
1195 version = "v1alpha1",
1196 kind = "RivvenTopic",
1197 plural = "rivventopics",
1198 shortname = "rt",
1199 namespaced,
1200 status = "RivvenTopicStatus",
1201 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
1202 printcolumn = r#"{"name":"Partitions","type":"integer","jsonPath":".spec.partitions"}"#,
1203 printcolumn = r#"{"name":"Replication","type":"integer","jsonPath":".spec.replicationFactor"}"#,
1204 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
1205 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
1206)]
1207#[serde(rename_all = "camelCase")]
1208pub struct RivvenTopicSpec {
1209 #[validate(nested)]
1211 pub cluster_ref: ClusterReference,
1212
1213 #[serde(default = "default_rivven_topic_partitions")]
1216 #[validate(range(
1217 min = 1,
1218 max = 10000,
1219 message = "partitions must be between 1 and 10000"
1220 ))]
1221 pub partitions: i32,
1222
1223 #[serde(default = "default_rivven_topic_replication")]
1226 #[validate(range(
1227 min = 1,
1228 max = 10,
1229 message = "replication factor must be between 1 and 10"
1230 ))]
1231 pub replication_factor: i32,
1232
1233 #[serde(default)]
1235 #[validate(nested)]
1236 pub config: TopicConfig,
1237
1238 #[serde(default)]
1240 #[validate(length(max = 100, message = "maximum 100 ACL entries allowed"))]
1241 #[validate(custom(function = "validate_topic_acls"))]
1242 pub acls: Vec<TopicAcl>,
1243
1244 #[serde(default = "default_true")]
1247 pub delete_on_remove: bool,
1248
1249 #[serde(default)]
1251 #[validate(custom(function = "validate_labels"))]
1252 pub topic_labels: BTreeMap<String, String>,
1253}
1254
1255fn default_rivven_topic_partitions() -> i32 {
1256 3
1257}
1258
1259fn default_rivven_topic_replication() -> i32 {
1260 1
1261}
1262
1263#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1265#[serde(rename_all = "camelCase")]
1266pub struct TopicConfig {
1267 #[serde(default = "default_topic_retention_ms")]
1270 #[validate(range(
1271 min = 3600000,
1272 max = 315360000000_i64,
1273 message = "retention must be between 1 hour and 10 years"
1274 ))]
1275 pub retention_ms: i64,
1276
1277 #[serde(default = "default_topic_retention_bytes")]
1280 #[validate(custom(function = "validate_topic_retention_bytes"))]
1281 pub retention_bytes: i64,
1282
1283 #[serde(default = "default_topic_segment_bytes")]
1285 #[validate(custom(function = "validate_segment_size"))]
1286 pub segment_bytes: i64,
1287
1288 #[serde(default = "default_topic_cleanup_policy")]
1290 #[validate(custom(function = "validate_topic_cleanup_policy"))]
1291 pub cleanup_policy: String,
1292
1293 #[serde(default = "default_topic_compression")]
1295 #[validate(custom(function = "validate_topic_compression"))]
1296 pub compression_type: String,
1297
1298 #[serde(default = "default_topic_min_isr")]
1300 #[validate(range(min = 1, max = 10, message = "min ISR must be between 1 and 10"))]
1301 pub min_insync_replicas: i32,
1302
1303 #[serde(default = "default_max_message_bytes")]
1305 #[validate(custom(function = "validate_message_size"))]
1306 pub max_message_bytes: i64,
1307
1308 #[serde(default = "default_true")]
1310 pub message_timestamp_enabled: bool,
1311
1312 #[serde(default = "default_topic_timestamp_type")]
1314 #[validate(custom(function = "validate_topic_timestamp_type"))]
1315 pub message_timestamp_type: String,
1316
1317 #[serde(default = "default_true")]
1319 pub idempotent_writes: bool,
1320
1321 #[serde(default)]
1323 #[validate(range(min = 0, max = 86400000, message = "flush interval must be 0-24 hours"))]
1324 pub flush_interval_ms: i64,
1325
1326 #[serde(default)]
1328 #[validate(custom(function = "validate_topic_custom_config"))]
1329 pub custom: BTreeMap<String, String>,
1330}
1331
1332fn default_topic_retention_ms() -> i64 {
1333 604800000 }
1335
1336fn default_topic_retention_bytes() -> i64 {
1337 -1 }
1339
1340fn default_topic_segment_bytes() -> i64 {
1341 1073741824 }
1343
1344fn default_topic_cleanup_policy() -> String {
1345 "delete".to_string()
1346}
1347
1348fn default_topic_compression() -> String {
1349 "lz4".to_string()
1350}
1351
1352fn default_topic_min_isr() -> i32 {
1353 1
1354}
1355
1356fn default_topic_timestamp_type() -> String {
1357 "CreateTime".to_string()
1358}
1359
1360fn validate_topic_retention_bytes(value: i64) -> Result<(), ValidationError> {
1361 if value == -1 || (1048576..=10995116277760).contains(&value) {
1362 Ok(())
1363 } else {
1364 Err(ValidationError::new("invalid_retention_bytes")
1365 .with_message("retention_bytes must be -1 (unlimited) or 1MB-10TB".into()))
1366 }
1367}
1368
1369fn validate_topic_cleanup_policy(policy: &str) -> Result<(), ValidationError> {
1370 match policy {
1371 "delete" | "compact" | "delete,compact" | "compact,delete" => Ok(()),
1372 _ => Err(ValidationError::new("invalid_cleanup_policy").with_message(
1373 "cleanup_policy must be 'delete', 'compact', or 'delete,compact'".into(),
1374 )),
1375 }
1376}
1377
1378fn validate_topic_compression(compression: &str) -> Result<(), ValidationError> {
1379 match compression {
1380 "none" | "gzip" | "snappy" | "lz4" | "zstd" | "producer" => Ok(()),
1381 _ => Err(ValidationError::new("invalid_compression")
1382 .with_message("compression must be none, gzip, snappy, lz4, zstd, or producer".into())),
1383 }
1384}
1385
1386fn validate_topic_timestamp_type(ts_type: &str) -> Result<(), ValidationError> {
1387 match ts_type {
1388 "CreateTime" | "LogAppendTime" => Ok(()),
1389 _ => Err(ValidationError::new("invalid_timestamp_type")
1390 .with_message("timestamp type must be 'CreateTime' or 'LogAppendTime'".into())),
1391 }
1392}
1393
1394fn validate_topic_custom_config(config: &BTreeMap<String, String>) -> Result<(), ValidationError> {
1395 if config.len() > 50 {
1396 return Err(ValidationError::new("too_many_custom_configs")
1397 .with_message("maximum 50 custom config entries allowed".into()));
1398 }
1399 for (key, value) in config {
1400 if key.len() > 128 || value.len() > 4096 {
1401 return Err(ValidationError::new("config_too_long")
1402 .with_message("config key max 128 chars, value max 4096 chars".into()));
1403 }
1404 let protected = [
1406 "retention.ms",
1407 "retention.bytes",
1408 "segment.bytes",
1409 "cleanup.policy",
1410 ];
1411 if protected.contains(&key.as_str()) {
1412 return Err(ValidationError::new("protected_config").with_message(
1413 format!(
1414 "'{}' must be set via dedicated field, not custom config",
1415 key
1416 )
1417 .into(),
1418 ));
1419 }
1420 }
1421 Ok(())
1422}
1423
1424#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1426#[serde(rename_all = "camelCase")]
1427pub struct TopicAcl {
1428 #[validate(length(min = 1, max = 256, message = "principal must be 1-256 characters"))]
1430 #[validate(custom(function = "validate_principal"))]
1431 pub principal: String,
1432
1433 #[validate(length(min = 1, max = 7, message = "must specify 1-7 operations"))]
1435 #[validate(custom(function = "validate_operations"))]
1436 pub operations: Vec<String>,
1437
1438 #[serde(default = "default_permission_type")]
1440 #[validate(custom(function = "validate_permission_type"))]
1441 pub permission_type: String,
1442
1443 #[serde(default = "default_acl_host")]
1445 #[validate(length(max = 256, message = "host must be max 256 characters"))]
1446 pub host: String,
1447}
1448
1449fn default_permission_type() -> String {
1450 "Allow".to_string()
1451}
1452
1453fn default_acl_host() -> String {
1454 "*".to_string()
1455}
1456
1457fn validate_principal(principal: &str) -> Result<(), ValidationError> {
1458 if principal == "*" {
1459 return Ok(());
1460 }
1461 if let Some((prefix, name)) = principal.split_once(':') {
1462 if !["user", "group", "User", "Group"].contains(&prefix) {
1463 return Err(ValidationError::new("invalid_principal_prefix")
1464 .with_message("principal prefix must be 'user:' or 'group:'".into()));
1465 }
1466 if name.is_empty() || name.len() > 128 {
1467 return Err(ValidationError::new("invalid_principal_name")
1468 .with_message("principal name must be 1-128 characters".into()));
1469 }
1470 Ok(())
1471 } else {
1472 Err(ValidationError::new("invalid_principal_format")
1473 .with_message("principal must be '*' or 'user:name' or 'group:name'".into()))
1474 }
1475}
1476
1477fn validate_operations(ops: &[String]) -> Result<(), ValidationError> {
1478 let valid_ops = [
1479 "Read",
1480 "Write",
1481 "Create",
1482 "Delete",
1483 "Alter",
1484 "Describe",
1485 "All",
1486 "DescribeConfigs",
1487 "AlterConfigs",
1488 ];
1489 for op in ops {
1490 if !valid_ops.contains(&op.as_str()) {
1491 return Err(ValidationError::new("invalid_operation").with_message(
1492 format!("'{}' is not a valid operation. Valid: {:?}", op, valid_ops).into(),
1493 ));
1494 }
1495 }
1496 Ok(())
1497}
1498
1499fn validate_permission_type(perm: &str) -> Result<(), ValidationError> {
1500 match perm {
1501 "Allow" | "Deny" => Ok(()),
1502 _ => Err(ValidationError::new("invalid_permission_type")
1503 .with_message("permission_type must be 'Allow' or 'Deny'".into())),
1504 }
1505}
1506
1507fn validate_topic_acls(acls: &[TopicAcl]) -> Result<(), ValidationError> {
1508 let mut seen = std::collections::HashSet::new();
1510 for acl in acls {
1511 for op in &acl.operations {
1512 let key = format!("{}:{}", acl.principal, op);
1513 if !seen.insert(key.clone()) {
1514 return Err(ValidationError::new("duplicate_acl")
1515 .with_message(format!("duplicate ACL entry for {}", key).into()));
1516 }
1517 }
1518 }
1519 Ok(())
1520}
1521
1522#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
1524#[serde(rename_all = "camelCase")]
1525pub struct RivvenTopicStatus {
1526 #[serde(default)]
1528 pub phase: String,
1529
1530 #[serde(default)]
1532 pub message: String,
1533
1534 #[serde(default)]
1536 pub current_partitions: i32,
1537
1538 #[serde(default)]
1540 pub current_replication_factor: i32,
1541
1542 #[serde(default)]
1544 pub topic_exists: bool,
1545
1546 #[serde(default)]
1548 pub observed_generation: i64,
1549
1550 #[serde(default)]
1552 pub conditions: Vec<TopicCondition>,
1553
1554 #[serde(default)]
1556 pub last_sync_time: Option<String>,
1557
1558 #[serde(default)]
1560 pub partition_info: Vec<PartitionInfo>,
1561}
1562
1563#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1565#[serde(rename_all = "camelCase")]
1566pub struct TopicCondition {
1567 pub r#type: String,
1569
1570 pub status: String,
1572
1573 pub reason: String,
1575
1576 pub message: String,
1578
1579 pub last_transition_time: String,
1581}
1582
1583#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
1585#[serde(rename_all = "camelCase")]
1586pub struct PartitionInfo {
1587 pub partition: i32,
1589
1590 pub leader: i32,
1592
1593 pub replicas: Vec<i32>,
1595
1596 pub isr: Vec<i32>,
1598}
1599
1600#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1602#[serde(rename_all = "camelCase")]
1603pub struct ConnectConfigSpec {
1604 #[serde(default = "default_state_dir")]
1606 pub state_dir: String,
1607
1608 #[serde(default = "default_log_level")]
1610 #[validate(custom(function = "validate_log_level"))]
1611 pub log_level: String,
1612}
1613
1614fn default_state_dir() -> String {
1615 "/data/connect-state".to_string()
1616}
1617
1618fn default_log_level() -> String {
1619 "info".to_string()
1620}
1621
1622fn validate_log_level(level: &str) -> Result<(), ValidationError> {
1623 match level {
1624 "trace" | "debug" | "info" | "warn" | "error" => Ok(()),
1625 _ => Err(ValidationError::new("invalid_log_level")
1626 .with_message("log level must be one of: trace, debug, info, warn, error".into())),
1627 }
1628}
1629
1630#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1636#[serde(rename_all = "camelCase")]
1637pub struct SourceConnectorSpec {
1638 #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1640 #[validate(custom(function = "validate_k8s_name"))]
1641 pub name: String,
1642
1643 #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1645 #[validate(custom(function = "validate_connector_type"))]
1646 pub connector: String,
1647
1648 #[validate(length(min = 1, max = 255, message = "topic must be 1-255 characters"))]
1650 pub topic: String,
1651
1652 #[serde(default)]
1657 pub topic_routing: Option<String>,
1658
1659 #[serde(default = "default_true")]
1661 pub enabled: bool,
1662
1663 #[serde(default)]
1667 pub config: serde_json::Value,
1668
1669 #[serde(default)]
1672 #[validate(custom(function = "validate_optional_k8s_name"))]
1673 pub config_secret_ref: Option<String>,
1674
1675 #[serde(default)]
1677 #[validate(nested)]
1678 pub topic_config: SourceTopicConfigSpec,
1679}
1680
1681fn validate_connector_type(connector: &str) -> Result<(), ValidationError> {
1682 static CONNECTOR_REGEX: LazyLock<Regex> =
1684 LazyLock::new(|| Regex::new(r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$").unwrap());
1685 if !CONNECTOR_REGEX.is_match(connector) {
1686 return Err(ValidationError::new("invalid_connector_type")
1687 .with_message("connector type must be lowercase alphanumeric with hyphens".into()));
1688 }
1689 Ok(())
1690}
1691
1692#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1694#[serde(rename_all = "camelCase")]
1695pub struct TableSpec {
1696 #[serde(default)]
1698 pub schema: Option<String>,
1699
1700 #[validate(length(min = 1, max = 128, message = "table name must be 1-128 characters"))]
1702 pub table: String,
1703
1704 #[serde(default)]
1706 pub topic: Option<String>,
1707
1708 #[serde(default)]
1710 #[validate(length(max = 500, message = "maximum 500 columns per table"))]
1711 pub columns: Vec<String>,
1712
1713 #[serde(default)]
1715 #[validate(length(max = 500, message = "maximum 500 excluded columns per table"))]
1716 pub exclude_columns: Vec<String>,
1717
1718 #[serde(default)]
1720 pub column_masks: std::collections::BTreeMap<String, String>,
1721}
1722
1723#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1725#[serde(rename_all = "camelCase")]
1726pub struct SourceTopicConfigSpec {
1727 #[serde(default)]
1729 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1730 pub partitions: Option<i32>,
1731
1732 #[serde(default)]
1734 #[validate(range(
1735 min = 1,
1736 max = 10,
1737 message = "replication factor must be between 1 and 10"
1738 ))]
1739 pub replication_factor: Option<i32>,
1740
1741 #[serde(default)]
1743 pub auto_create: Option<bool>,
1744}
1745
1746#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1752#[serde(rename_all = "camelCase")]
1753pub struct SinkConnectorSpec {
1754 #[validate(length(min = 1, max = 63, message = "name must be 1-63 characters"))]
1756 #[validate(custom(function = "validate_k8s_name"))]
1757 pub name: String,
1758
1759 #[validate(length(min = 1, max = 64, message = "connector type must be 1-64 characters"))]
1761 #[validate(custom(function = "validate_connector_type"))]
1762 pub connector: String,
1763
1764 #[validate(length(min = 1, max = 100, message = "must have 1-100 topics"))]
1766 pub topics: Vec<String>,
1767
1768 #[validate(length(
1770 min = 1,
1771 max = 128,
1772 message = "consumer group must be 1-128 characters"
1773 ))]
1774 pub consumer_group: String,
1775
1776 #[serde(default = "default_true")]
1778 pub enabled: bool,
1779
1780 #[serde(default = "default_start_offset")]
1782 #[validate(custom(function = "validate_start_offset"))]
1783 pub start_offset: String,
1784
1785 #[serde(default)]
1789 pub config: serde_json::Value,
1790
1791 #[serde(default)]
1794 #[validate(custom(function = "validate_optional_k8s_name"))]
1795 pub config_secret_ref: Option<String>,
1796
1797 #[serde(default)]
1799 #[validate(nested)]
1800 pub rate_limit: RateLimitSpec,
1801}
1802
1803fn default_start_offset() -> String {
1804 "latest".to_string()
1805}
1806
1807fn validate_start_offset(offset: &str) -> Result<(), ValidationError> {
1808 match offset {
1809 "earliest" | "latest" => Ok(()),
1810 s => {
1811 chrono::DateTime::parse_from_rfc3339(s).map_err(|e| {
1814 ValidationError::new("invalid_start_offset").with_message(
1815 format!(
1816 "start offset must be 'earliest', 'latest', or a valid \
1817 RFC 3339 timestamp (e.g. '2024-01-01T00:00:00Z'): {}",
1818 e
1819 )
1820 .into(),
1821 )
1822 })?;
1823 Ok(())
1824 }
1825 }
1826}
1827
1828#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1830#[serde(rename_all = "camelCase")]
1831pub struct RateLimitSpec {
1832 #[serde(default)]
1834 #[validate(range(
1835 min = 0,
1836 max = 1_000_000,
1837 message = "events per second must be 0-1000000"
1838 ))]
1839 pub events_per_second: u64,
1840
1841 #[serde(default)]
1843 #[validate(range(min = 0, max = 100_000, message = "burst capacity must be 0-100000"))]
1844 pub burst_capacity: Option<u64>,
1845}
1846
1847#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
1849#[serde(rename_all = "camelCase")]
1850pub struct GlobalConnectSettings {
1851 #[serde(default)]
1853 #[validate(nested)]
1854 pub topic: TopicSettingsSpec,
1855
1856 #[serde(default)]
1858 #[validate(nested)]
1859 pub retry: RetryConfigSpec,
1860
1861 #[serde(default)]
1863 #[validate(nested)]
1864 pub health: HealthConfigSpec,
1865
1866 #[serde(default)]
1868 #[validate(nested)]
1869 pub metrics: ConnectMetricsSpec,
1870}
1871
1872#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1874#[serde(rename_all = "camelCase")]
1875pub struct TopicSettingsSpec {
1876 #[serde(default = "default_true")]
1878 pub auto_create: bool,
1879
1880 #[serde(default = "default_topic_partitions")]
1882 #[validate(range(min = 1, max = 1000, message = "partitions must be between 1 and 1000"))]
1883 pub default_partitions: i32,
1884
1885 #[serde(default = "default_topic_replication")]
1887 #[validate(range(
1888 min = 1,
1889 max = 10,
1890 message = "replication factor must be between 1 and 10"
1891 ))]
1892 pub default_replication_factor: i32,
1893
1894 #[serde(default = "default_true")]
1896 pub require_topic_exists: bool,
1897}
1898
1899fn default_topic_partitions() -> i32 {
1900 1
1901}
1902
1903fn default_topic_replication() -> i32 {
1904 1
1905}
1906
1907impl Default for TopicSettingsSpec {
1908 fn default() -> Self {
1909 Self {
1910 auto_create: true,
1911 default_partitions: 1,
1912 default_replication_factor: 1,
1913 require_topic_exists: true,
1914 }
1915 }
1916}
1917
1918#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1920#[serde(rename_all = "camelCase")]
1921pub struct RetryConfigSpec {
1922 #[serde(default = "default_max_retries")]
1924 #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
1925 pub max_retries: i32,
1926
1927 #[serde(default = "default_initial_backoff_ms")]
1929 #[validate(range(min = 10, max = 60000, message = "initial backoff must be 10-60000ms"))]
1930 pub initial_backoff_ms: i64,
1931
1932 #[serde(default = "default_max_backoff_ms")]
1934 #[validate(range(
1935 min = 100,
1936 max = 3600000,
1937 message = "max backoff must be 100-3600000ms"
1938 ))]
1939 pub max_backoff_ms: i64,
1940
1941 #[serde(default = "default_backoff_multiplier")]
1943 pub backoff_multiplier: f64,
1944}
1945
1946fn default_max_retries() -> i32 {
1947 10
1948}
1949
1950fn default_initial_backoff_ms() -> i64 {
1951 100
1952}
1953
1954fn default_max_backoff_ms() -> i64 {
1955 30000
1956}
1957
1958fn default_backoff_multiplier() -> f64 {
1959 2.0
1960}
1961
1962impl Default for RetryConfigSpec {
1963 fn default() -> Self {
1964 Self {
1965 max_retries: 10,
1966 initial_backoff_ms: 100,
1967 max_backoff_ms: 30000,
1968 backoff_multiplier: 2.0,
1969 }
1970 }
1971}
1972
1973#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
1975#[serde(rename_all = "camelCase")]
1976pub struct HealthConfigSpec {
1977 #[serde(default)]
1979 pub enabled: bool,
1980
1981 #[serde(default = "default_health_port")]
1983 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
1984 pub port: i32,
1985
1986 #[serde(default = "default_health_path")]
1988 pub path: String,
1989}
1990
1991fn default_health_port() -> i32 {
1992 8080
1993}
1994
1995fn default_health_path() -> String {
1996 "/health".to_string()
1997}
1998
1999impl Default for HealthConfigSpec {
2000 fn default() -> Self {
2001 Self {
2002 enabled: false,
2003 port: 8080,
2004 path: "/health".to_string(),
2005 }
2006 }
2007}
2008
2009#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2011#[serde(rename_all = "camelCase")]
2012pub struct ConnectMetricsSpec {
2013 #[serde(default)]
2015 pub enabled: bool,
2016
2017 #[serde(default = "default_connect_metrics_port")]
2019 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
2020 pub port: i32,
2021}
2022
2023fn default_connect_metrics_port() -> i32 {
2024 9091
2025}
2026
2027impl Default for ConnectMetricsSpec {
2028 fn default() -> Self {
2029 Self {
2030 enabled: false,
2031 port: 9091,
2032 }
2033 }
2034}
2035
2036#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2038#[serde(rename_all = "camelCase")]
2039pub struct ConnectTlsSpec {
2040 #[serde(default)]
2042 pub enabled: bool,
2043
2044 #[serde(default)]
2046 #[validate(custom(function = "validate_optional_k8s_name"))]
2047 pub cert_secret_name: Option<String>,
2048
2049 #[serde(default)]
2051 pub mtls_enabled: bool,
2052
2053 #[serde(default)]
2055 #[validate(custom(function = "validate_optional_k8s_name"))]
2056 pub ca_secret_name: Option<String>,
2057
2058 #[serde(default)]
2060 pub insecure: bool,
2061}
2062
2063fn default_connect_replicas() -> i32 {
2064 1
2065}
2066
2067#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
2069#[serde(rename_all = "camelCase")]
2070pub struct RivvenConnectStatus {
2071 pub phase: ConnectPhase,
2073
2074 pub replicas: i32,
2076
2077 pub ready_replicas: i32,
2079
2080 pub sources_running: i32,
2082
2083 pub sinks_running: i32,
2085
2086 pub sources_total: i32,
2088
2089 pub sinks_total: i32,
2091
2092 pub observed_generation: i64,
2094
2095 #[serde(default)]
2097 pub conditions: Vec<ConnectCondition>,
2098
2099 #[serde(default)]
2101 pub connector_statuses: Vec<ConnectorStatus>,
2102
2103 pub last_updated: Option<String>,
2105
2106 pub message: Option<String>,
2108}
2109
2110#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
2112pub enum ConnectPhase {
2113 #[default]
2115 Pending,
2116 Starting,
2118 Running,
2120 Degraded,
2122 Failed,
2124 Terminating,
2126}
2127
2128#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2130#[serde(rename_all = "camelCase")]
2131pub struct ConnectCondition {
2132 #[serde(rename = "type")]
2134 pub condition_type: String,
2135
2136 pub status: String,
2138
2139 pub reason: Option<String>,
2141
2142 pub message: Option<String>,
2144
2145 pub last_transition_time: Option<String>,
2147}
2148
2149#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
2161#[serde(rename_all = "camelCase")]
2162pub struct ConnectorStatus {
2163 pub name: String,
2165
2166 pub connector_type: String,
2168
2169 pub kind: String,
2171
2172 pub state: String,
2174
2175 #[serde(default = "default_status_source")]
2179 pub status_source: String,
2180
2181 pub last_probed: Option<String>,
2185
2186 pub events_processed: i64,
2188
2189 pub last_error: Option<String>,
2191
2192 pub last_success_time: Option<String>,
2194}
2195
2196fn default_status_source() -> String {
2197 "synthetic".to_string()
2198}
2199
2200impl RivvenConnectSpec {
2201 pub fn get_image(&self) -> String {
2203 if let Some(ref image) = self.image {
2204 image.clone()
2205 } else {
2206 format!("ghcr.io/hupe1980/rivven-connect:{}", self.version)
2207 }
2208 }
2209
2210 pub fn get_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2212 let mut labels = BTreeMap::new();
2213 labels.insert(
2214 "app.kubernetes.io/name".to_string(),
2215 "rivven-connect".to_string(),
2216 );
2217 labels.insert(
2218 "app.kubernetes.io/instance".to_string(),
2219 connect_name.to_string(),
2220 );
2221 labels.insert(
2222 "app.kubernetes.io/component".to_string(),
2223 "connector".to_string(),
2224 );
2225 labels.insert(
2226 "app.kubernetes.io/managed-by".to_string(),
2227 "rivven-operator".to_string(),
2228 );
2229 labels.insert(
2230 "app.kubernetes.io/version".to_string(),
2231 self.version.clone(),
2232 );
2233 labels
2234 }
2235
2236 pub fn get_selector_labels(&self, connect_name: &str) -> BTreeMap<String, String> {
2238 let mut labels = BTreeMap::new();
2239 labels.insert(
2240 "app.kubernetes.io/name".to_string(),
2241 "rivven-connect".to_string(),
2242 );
2243 labels.insert(
2244 "app.kubernetes.io/instance".to_string(),
2245 connect_name.to_string(),
2246 );
2247 labels
2248 }
2249
2250 pub fn enabled_sources_count(&self) -> usize {
2252 self.sources.iter().filter(|s| s.enabled).count()
2253 }
2254
2255 pub fn enabled_sinks_count(&self) -> usize {
2257 self.sinks.iter().filter(|s| s.enabled).count()
2258 }
2259}
2260
2261#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2268#[serde(rename_all = "camelCase")]
2269pub struct SnapshotCdcConfigSpec {
2270 #[serde(default = "default_snapshot_batch_size")]
2272 #[validate(range(min = 100, max = 1000000, message = "batch size must be 100-1000000"))]
2273 pub batch_size: i32,
2274
2275 #[serde(default = "default_snapshot_parallel_tables")]
2277 #[validate(range(min = 1, max = 32, message = "parallel tables must be 1-32"))]
2278 pub parallel_tables: i32,
2279
2280 #[serde(default = "default_snapshot_query_timeout")]
2282 #[validate(range(min = 10, max = 3600, message = "query timeout must be 10-3600s"))]
2283 pub query_timeout_secs: i32,
2284
2285 #[serde(default)]
2287 #[validate(range(min = 0, max = 60000, message = "throttle delay must be 0-60000ms"))]
2288 pub throttle_delay_ms: i32,
2289
2290 #[serde(default = "default_snapshot_max_retries")]
2292 #[validate(range(min = 0, max = 10, message = "max retries must be 0-10"))]
2293 pub max_retries: i32,
2294
2295 #[serde(default)]
2297 pub include_tables: Vec<String>,
2298
2299 #[serde(default)]
2301 pub exclude_tables: Vec<String>,
2302}
2303
2304fn default_snapshot_batch_size() -> i32 {
2305 10_000
2306}
2307fn default_snapshot_parallel_tables() -> i32 {
2308 4
2309}
2310fn default_snapshot_query_timeout() -> i32 {
2311 300
2312}
2313fn default_snapshot_max_retries() -> i32 {
2314 3
2315}
2316
2317#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2319#[serde(rename_all = "camelCase")]
2320pub struct IncrementalSnapshotSpec {
2321 #[serde(default)]
2323 pub enabled: bool,
2324
2325 #[serde(default = "default_incremental_chunk_size")]
2327 #[validate(range(min = 100, max = 100000, message = "chunk size must be 100-100000"))]
2328 pub chunk_size: i32,
2329
2330 #[serde(default)]
2332 #[validate(custom(function = "validate_watermark_strategy"))]
2333 pub watermark_strategy: String,
2334
2335 #[serde(default)]
2337 pub watermark_signal_table: Option<String>,
2338
2339 #[serde(default = "default_incremental_max_chunks")]
2341 #[validate(range(min = 1, max = 16, message = "max concurrent chunks must be 1-16"))]
2342 pub max_concurrent_chunks: i32,
2343
2344 #[serde(default)]
2346 #[validate(range(min = 0, max = 60000, message = "chunk delay must be 0-60000ms"))]
2347 pub chunk_delay_ms: i32,
2348}
2349
2350fn default_incremental_chunk_size() -> i32 {
2351 1024
2352}
2353fn default_incremental_max_chunks() -> i32 {
2354 1
2355}
2356
2357fn validate_watermark_strategy(strategy: &str) -> Result<(), ValidationError> {
2358 match strategy {
2359 "" | "insert" | "update_and_insert" => Ok(()),
2360 _ => Err(ValidationError::new("invalid_watermark_strategy")
2361 .with_message("watermark strategy must be: insert or update_and_insert".into())),
2362 }
2363}
2364
2365#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2367#[serde(rename_all = "camelCase")]
2368pub struct SignalTableSpec {
2369 #[serde(default)]
2371 pub enabled: bool,
2372
2373 #[serde(default)]
2375 pub data_collection: Option<String>,
2376
2377 #[serde(default)]
2379 pub topic: Option<String>,
2380
2381 #[serde(default)]
2383 pub enabled_channels: Vec<String>,
2384
2385 #[serde(default = "default_signal_poll_interval")]
2387 pub poll_interval_ms: i32,
2388}
2389
2390fn default_signal_poll_interval() -> i32 {
2391 1000
2392}
2393
2394#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2396#[serde(rename_all = "camelCase")]
2397pub struct HeartbeatCdcSpec {
2398 #[serde(default)]
2400 pub enabled: bool,
2401
2402 #[serde(default = "default_heartbeat_cdc_interval")]
2404 #[validate(range(min = 1, max = 3600, message = "heartbeat interval must be 1-3600s"))]
2405 pub interval_secs: i32,
2406
2407 #[serde(default = "default_heartbeat_max_lag")]
2409 #[validate(range(min = 10, max = 86400, message = "max lag must be 10-86400s"))]
2410 pub max_lag_secs: i32,
2411
2412 #[serde(default)]
2414 pub emit_events: bool,
2415
2416 #[serde(default)]
2418 pub topic: Option<String>,
2419
2420 #[serde(default)]
2422 pub action_query: Option<String>,
2423}
2424
2425fn default_heartbeat_cdc_interval() -> i32 {
2426 10
2427}
2428fn default_heartbeat_max_lag() -> i32 {
2429 300
2430}
2431
2432#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2434#[serde(rename_all = "camelCase")]
2435pub struct DeduplicationCdcSpec {
2436 #[serde(default)]
2438 pub enabled: bool,
2439
2440 #[serde(default = "default_bloom_expected")]
2442 #[validate(range(
2443 min = 1000,
2444 max = 10000000,
2445 message = "bloom expected must be 1000-10M"
2446 ))]
2447 pub bloom_expected_insertions: i64,
2448
2449 #[serde(default = "default_bloom_fpp")]
2451 pub bloom_fpp: f64,
2452
2453 #[serde(default = "default_lru_size")]
2455 #[validate(range(min = 1000, max = 1000000, message = "LRU size must be 1000-1M"))]
2456 pub lru_size: i64,
2457
2458 #[serde(default = "default_dedup_window")]
2460 #[validate(range(min = 60, max = 604800, message = "window must be 60-604800s"))]
2461 pub window_secs: i64,
2462}
2463
2464fn default_bloom_expected() -> i64 {
2465 100_000
2466}
2467fn default_bloom_fpp() -> f64 {
2468 0.01
2469}
2470fn default_lru_size() -> i64 {
2471 10_000
2472}
2473fn default_dedup_window() -> i64 {
2474 3600
2475}
2476
2477#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2479#[serde(rename_all = "camelCase")]
2480pub struct TransactionTopicSpec {
2481 #[serde(default)]
2483 pub enabled: bool,
2484
2485 #[serde(default)]
2487 pub topic_name: Option<String>,
2488
2489 #[serde(default = "default_true_cdc")]
2491 pub include_data_collections: bool,
2492
2493 #[serde(default)]
2495 #[validate(range(min = 0, max = 10000, message = "min events must be 0-10000"))]
2496 pub min_events_threshold: i32,
2497}
2498
2499fn default_true_cdc() -> bool {
2500 true
2501}
2502
2503#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2505#[serde(rename_all = "camelCase")]
2506pub struct SchemaChangeTopicSpec {
2507 #[serde(default)]
2509 pub enabled: bool,
2510
2511 #[serde(default)]
2513 pub topic_name: Option<String>,
2514
2515 #[serde(default = "default_true_cdc")]
2517 pub include_columns: bool,
2518
2519 #[serde(default)]
2521 pub schemas: Vec<String>,
2522}
2523
2524#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2526#[serde(rename_all = "camelCase")]
2527pub struct TombstoneCdcSpec {
2528 #[serde(default)]
2530 pub enabled: bool,
2531
2532 #[serde(default = "default_true_cdc")]
2534 pub after_delete: bool,
2535
2536 #[serde(default)]
2538 #[validate(custom(function = "validate_tombstone_behavior"))]
2539 pub behavior: String,
2540}
2541
2542fn validate_tombstone_behavior(behavior: &str) -> Result<(), ValidationError> {
2543 match behavior {
2544 "" | "emit_null" | "emit_with_key" => Ok(()),
2545 _ => Err(ValidationError::new("invalid_tombstone_behavior")
2546 .with_message("tombstone behavior must be: emit_null or emit_with_key".into())),
2547 }
2548}
2549
2550#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2552#[serde(rename_all = "camelCase")]
2553pub struct FieldEncryptionSpec {
2554 #[serde(default)]
2556 pub enabled: bool,
2557
2558 #[serde(default)]
2560 #[validate(custom(function = "validate_optional_k8s_name"))]
2561 pub key_secret_ref: Option<String>,
2562
2563 #[serde(default)]
2565 pub fields: Vec<String>,
2566
2567 #[serde(default = "default_encryption_algorithm")]
2569 pub algorithm: String,
2570}
2571
2572fn default_encryption_algorithm() -> String {
2573 "aes-256-gcm".to_string()
2574}
2575
2576#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2578#[serde(rename_all = "camelCase")]
2579pub struct ReadOnlyReplicaSpec {
2580 #[serde(default)]
2582 pub enabled: bool,
2583
2584 #[serde(default = "default_lag_threshold")]
2586 #[validate(range(
2587 min = 100,
2588 max = 300000,
2589 message = "lag threshold must be 100-300000ms"
2590 ))]
2591 pub lag_threshold_ms: i64,
2592
2593 #[serde(default = "default_true_cdc")]
2595 pub deduplicate: bool,
2596
2597 #[serde(default)]
2599 #[validate(custom(function = "validate_watermark_source"))]
2600 pub watermark_source: String,
2601}
2602
2603fn default_lag_threshold() -> i64 {
2604 5000
2605}
2606
2607fn validate_watermark_source(source: &str) -> Result<(), ValidationError> {
2608 match source {
2609 "" | "primary" | "replica" => Ok(()),
2610 _ => Err(ValidationError::new("invalid_watermark_source")
2611 .with_message("watermark source must be: primary or replica".into())),
2612 }
2613}
2614
2615#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2617#[serde(rename_all = "camelCase")]
2618pub struct EventRouterSpec {
2619 #[serde(default)]
2621 pub enabled: bool,
2622
2623 #[serde(default)]
2625 pub default_destination: Option<String>,
2626
2627 #[serde(default)]
2629 pub dead_letter_queue: Option<String>,
2630
2631 #[serde(default)]
2633 pub drop_unroutable: bool,
2634
2635 #[serde(default)]
2637 #[validate(nested)]
2638 pub rules: Vec<RouteRuleSpec>,
2639}
2640
2641#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2643#[serde(rename_all = "camelCase")]
2644pub struct RouteRuleSpec {
2645 #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
2647 pub name: String,
2648
2649 #[serde(default)]
2651 pub priority: i32,
2652
2653 #[validate(custom(function = "validate_route_condition_type"))]
2655 pub condition_type: String,
2656
2657 #[serde(default)]
2659 pub condition_value: Option<String>,
2660
2661 #[serde(default)]
2663 pub condition_value2: Option<String>,
2664
2665 pub destinations: Vec<String>,
2667
2668 #[serde(default)]
2670 pub continue_matching: bool,
2671}
2672
2673fn validate_route_condition_type(condition: &str) -> Result<(), ValidationError> {
2674 match condition {
2675 "always" | "table" | "table_pattern" | "schema" | "operation" | "field_equals"
2676 | "field_exists" => Ok(()),
2677 _ => Err(ValidationError::new("invalid_route_condition_type").with_message(
2678 "condition type must be: always, table, table_pattern, schema, operation, field_equals, or field_exists".into(),
2679 )),
2680 }
2681}
2682
2683#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2685#[serde(rename_all = "camelCase")]
2686pub struct PartitionerSpec {
2687 #[serde(default)]
2689 pub enabled: bool,
2690
2691 #[serde(default = "default_num_partitions")]
2693 #[validate(range(min = 1, max = 10000, message = "num partitions must be 1-10000"))]
2694 pub num_partitions: i32,
2695
2696 #[serde(default = "default_partition_strategy")]
2698 #[validate(custom(function = "validate_partition_strategy"))]
2699 pub strategy: String,
2700
2701 #[serde(default)]
2703 pub key_columns: Vec<String>,
2704
2705 #[serde(default)]
2707 pub sticky_partition: Option<i32>,
2708}
2709
2710fn default_num_partitions() -> i32 {
2711 16
2712}
2713fn default_partition_strategy() -> String {
2714 "key_hash".to_string()
2715}
2716
2717fn validate_partition_strategy(strategy: &str) -> Result<(), ValidationError> {
2718 match strategy {
2719 "round_robin" | "key_hash" | "table_hash" | "full_table_hash" | "sticky" => Ok(()),
2720 _ => Err(ValidationError::new("invalid_partition_strategy").with_message(
2721 "partition strategy must be: round_robin, key_hash, table_hash, full_table_hash, or sticky".into(),
2722 )),
2723 }
2724}
2725
2726#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2728#[serde(rename_all = "camelCase")]
2729pub struct SmtTransformSpec {
2730 #[validate(custom(function = "validate_smt_transform_type"))]
2732 pub transform_type: String,
2733
2734 #[serde(default)]
2736 pub name: Option<String>,
2737
2738 #[serde(default)]
2740 pub config: serde_json::Value,
2741}
2742
2743fn validate_smt_transform_type(transform: &str) -> Result<(), ValidationError> {
2744 match transform {
2745 "extract_new_record_state" | "value_to_key" | "timestamp_converter"
2746 | "timezone_converter" | "mask_field" | "filter" | "flatten" | "insert_field"
2747 | "rename_field" | "replace_field" | "cast" | "regex_router" | "content_router"
2748 | "header_to_value" | "unwrap" | "set_null" | "compute_field" | "conditional_smt" => Ok(()),
2749 _ => Err(ValidationError::new("invalid_smt_transform_type").with_message(
2750 "transform type must be a valid SMT (extract_new_record_state, mask_field, filter, etc.)".into(),
2751 )),
2752 }
2753}
2754
2755#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2757#[serde(rename_all = "camelCase")]
2758pub struct ParallelCdcSpec {
2759 #[serde(default)]
2761 pub enabled: bool,
2762
2763 #[serde(default = "default_parallel_concurrency")]
2765 #[validate(range(min = 1, max = 64, message = "concurrency must be 1-64"))]
2766 pub concurrency: i32,
2767
2768 #[serde(default = "default_per_table_buffer")]
2770 #[validate(range(
2771 min = 100,
2772 max = 100000,
2773 message = "per table buffer must be 100-100000"
2774 ))]
2775 pub per_table_buffer: i32,
2776
2777 #[serde(default = "default_output_buffer")]
2779 #[validate(range(min = 1000, max = 1000000, message = "output buffer must be 1000-1M"))]
2780 pub output_buffer: i32,
2781
2782 #[serde(default = "default_true_cdc")]
2784 pub work_stealing: bool,
2785
2786 #[serde(default)]
2788 pub per_table_rate_limit: Option<i64>,
2789
2790 #[serde(default = "default_shutdown_timeout")]
2792 #[validate(range(min = 1, max = 300, message = "shutdown timeout must be 1-300s"))]
2793 pub shutdown_timeout_secs: i32,
2794}
2795
2796fn default_parallel_concurrency() -> i32 {
2797 4
2798}
2799fn default_per_table_buffer() -> i32 {
2800 1000
2801}
2802fn default_output_buffer() -> i32 {
2803 10_000
2804}
2805fn default_shutdown_timeout() -> i32 {
2806 30
2807}
2808
2809#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2811#[serde(rename_all = "camelCase")]
2812pub struct OutboxSpec {
2813 #[serde(default)]
2815 pub enabled: bool,
2816
2817 #[serde(default = "default_outbox_table")]
2819 #[validate(length(min = 1, max = 128, message = "outbox table must be 1-128 characters"))]
2820 pub table_name: String,
2821
2822 #[serde(default = "default_outbox_poll_interval")]
2824 #[validate(range(min = 10, max = 60000, message = "poll interval must be 10-60000ms"))]
2825 pub poll_interval_ms: i32,
2826
2827 #[serde(default = "default_outbox_batch_size")]
2829 #[validate(range(min = 1, max = 10000, message = "batch size must be 1-10000"))]
2830 pub batch_size: i32,
2831
2832 #[serde(default = "default_outbox_max_retries")]
2834 #[validate(range(min = 0, max = 100, message = "max retries must be 0-100"))]
2835 pub max_retries: i32,
2836
2837 #[serde(default = "default_outbox_timeout")]
2839 #[validate(range(min = 1, max = 300, message = "timeout must be 1-300s"))]
2840 pub delivery_timeout_secs: i32,
2841
2842 #[serde(default = "default_true_cdc")]
2844 pub ordered_delivery: bool,
2845
2846 #[serde(default = "default_outbox_retention")]
2848 #[validate(range(min = 60, max = 604800, message = "retention must be 60-604800s"))]
2849 pub retention_secs: i64,
2850
2851 #[serde(default = "default_outbox_concurrency")]
2853 #[validate(range(min = 1, max = 100, message = "concurrency must be 1-100"))]
2854 pub max_concurrency: i32,
2855}
2856
2857fn default_outbox_table() -> String {
2858 "outbox".to_string()
2859}
2860fn default_outbox_poll_interval() -> i32 {
2861 100
2862}
2863fn default_outbox_batch_size() -> i32 {
2864 100
2865}
2866fn default_outbox_max_retries() -> i32 {
2867 3
2868}
2869fn default_outbox_timeout() -> i32 {
2870 30
2871}
2872fn default_outbox_retention() -> i64 {
2873 86400
2874}
2875fn default_outbox_concurrency() -> i32 {
2876 10
2877}
2878
2879#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
2881#[serde(rename_all = "camelCase")]
2882pub struct HealthMonitorSpec {
2883 #[serde(default)]
2885 pub enabled: bool,
2886
2887 #[serde(default = "default_health_check_interval")]
2889 #[validate(range(min = 1, max = 300, message = "check interval must be 1-300s"))]
2890 pub check_interval_secs: i32,
2891
2892 #[serde(default = "default_health_max_lag")]
2894 #[validate(range(min = 1000, max = 3600000, message = "max lag must be 1000-3600000ms"))]
2895 pub max_lag_ms: i64,
2896
2897 #[serde(default = "default_health_failure_threshold")]
2899 #[validate(range(min = 1, max = 10, message = "failure threshold must be 1-10"))]
2900 pub failure_threshold: i32,
2901
2902 #[serde(default = "default_health_success_threshold")]
2904 #[validate(range(min = 1, max = 10, message = "success threshold must be 1-10"))]
2905 pub success_threshold: i32,
2906
2907 #[serde(default = "default_health_check_timeout")]
2909 #[validate(range(min = 1, max = 60, message = "check timeout must be 1-60s"))]
2910 pub check_timeout_secs: i32,
2911
2912 #[serde(default = "default_true_cdc")]
2914 pub auto_recovery: bool,
2915
2916 #[serde(default = "default_health_recovery_delay")]
2918 #[validate(range(min = 1, max = 300, message = "recovery delay must be 1-300s"))]
2919 pub recovery_delay_secs: i32,
2920
2921 #[serde(default = "default_health_max_recovery_delay")]
2923 #[validate(range(min = 1, max = 3600, message = "max recovery delay must be 1-3600s"))]
2924 pub max_recovery_delay_secs: i32,
2925}
2926
2927fn default_health_check_interval() -> i32 {
2928 10
2929}
2930fn default_health_max_lag() -> i64 {
2931 30_000
2932}
2933fn default_health_failure_threshold() -> i32 {
2934 3
2935}
2936fn default_health_success_threshold() -> i32 {
2937 2
2938}
2939fn default_health_check_timeout() -> i32 {
2940 5
2941}
2942fn default_health_recovery_delay() -> i32 {
2943 1
2944}
2945fn default_health_max_recovery_delay() -> i32 {
2946 60
2947}
2948
2949#[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
2987#[kube(
2988 group = "rivven.hupe1980.github.io",
2989 version = "v1alpha1",
2990 kind = "RivvenSchemaRegistry",
2991 plural = "rivvenschemaregistries",
2992 shortname = "rsr",
2993 namespaced,
2994 status = "RivvenSchemaRegistryStatus",
2995 printcolumn = r#"{"name":"Cluster","type":"string","jsonPath":".spec.clusterRef.name"}"#,
2996 printcolumn = r#"{"name":"Replicas","type":"integer","jsonPath":".spec.replicas"}"#,
2997 printcolumn = r#"{"name":"Ready","type":"integer","jsonPath":".status.readyReplicas"}"#,
2998 printcolumn = r#"{"name":"Schemas","type":"integer","jsonPath":".status.schemasRegistered"}"#,
2999 printcolumn = r#"{"name":"Phase","type":"string","jsonPath":".status.phase"}"#,
3000 printcolumn = r#"{"name":"Age","type":"date","jsonPath":".metadata.creationTimestamp"}"#
3001)]
3002#[serde(rename_all = "camelCase")]
3003pub struct RivvenSchemaRegistrySpec {
3004 #[validate(nested)]
3006 pub cluster_ref: ClusterReference,
3007
3008 #[serde(default = "default_schema_registry_replicas")]
3010 #[validate(range(min = 1, max = 10, message = "replicas must be between 1 and 10"))]
3011 pub replicas: i32,
3012
3013 #[serde(default = "default_version")]
3015 pub version: String,
3016
3017 #[serde(default)]
3019 #[validate(custom(function = "validate_optional_image"))]
3020 pub image: Option<String>,
3021
3022 #[serde(default = "default_image_pull_policy")]
3024 #[validate(custom(function = "validate_pull_policy"))]
3025 pub image_pull_policy: String,
3026
3027 #[serde(default)]
3029 pub image_pull_secrets: Vec<String>,
3030
3031 #[serde(default)]
3033 pub resources: Option<serde_json::Value>,
3034
3035 #[serde(default)]
3037 #[validate(nested)]
3038 pub server: SchemaRegistryServerSpec,
3039
3040 #[serde(default)]
3042 #[validate(nested)]
3043 pub storage: SchemaRegistryStorageSpec,
3044
3045 #[serde(default)]
3047 #[validate(nested)]
3048 pub compatibility: SchemaCompatibilitySpec,
3049
3050 #[serde(default)]
3052 #[validate(nested)]
3053 pub schemas: SchemaFormatSpec,
3054
3055 #[serde(default)]
3057 #[validate(nested)]
3058 pub contexts: SchemaContextsSpec,
3059
3060 #[serde(default)]
3062 #[validate(nested)]
3063 pub validation: SchemaValidationSpec,
3064
3065 #[serde(default)]
3067 #[validate(nested)]
3068 pub auth: SchemaRegistryAuthSpec,
3069
3070 #[serde(default)]
3072 #[validate(nested)]
3073 pub tls: SchemaRegistryTlsSpec,
3074
3075 #[serde(default)]
3077 #[validate(nested)]
3078 pub metrics: SchemaRegistryMetricsSpec,
3079
3080 #[serde(default)]
3082 #[validate(nested)]
3083 pub external: ExternalRegistrySpec,
3084
3085 #[serde(default)]
3087 #[validate(custom(function = "validate_annotations"))]
3088 pub pod_annotations: BTreeMap<String, String>,
3089
3090 #[serde(default)]
3092 #[validate(custom(function = "validate_labels"))]
3093 pub pod_labels: BTreeMap<String, String>,
3094
3095 #[serde(default)]
3097 #[validate(length(max = 100, message = "maximum 100 environment variables allowed"))]
3098 pub env: Vec<k8s_openapi::api::core::v1::EnvVar>,
3099
3100 #[serde(default)]
3102 pub node_selector: BTreeMap<String, String>,
3103
3104 #[serde(default)]
3106 #[validate(length(max = 20, message = "maximum 20 tolerations allowed"))]
3107 pub tolerations: Vec<k8s_openapi::api::core::v1::Toleration>,
3108
3109 #[serde(default)]
3111 pub affinity: Option<serde_json::Value>,
3112
3113 #[serde(default)]
3115 #[validate(custom(function = "validate_optional_k8s_name"))]
3116 pub service_account: Option<String>,
3117
3118 #[serde(default)]
3120 pub security_context: Option<serde_json::Value>,
3121
3122 #[serde(default)]
3124 pub container_security_context: Option<serde_json::Value>,
3125
3126 #[serde(default)]
3128 #[validate(nested)]
3129 pub liveness_probe: ProbeSpec,
3130
3131 #[serde(default)]
3133 #[validate(nested)]
3134 pub readiness_probe: ProbeSpec,
3135
3136 #[serde(default)]
3138 #[validate(nested)]
3139 pub pod_disruption_budget: PdbSpec,
3140}
3141
3142fn default_schema_registry_replicas() -> i32 {
3143 1
3144}
3145
3146#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3148#[serde(rename_all = "camelCase")]
3149pub struct SchemaRegistryServerSpec {
3150 #[serde(default = "default_schema_registry_port")]
3152 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3153 pub port: i32,
3154
3155 #[serde(default = "default_bind_address")]
3157 pub bind_address: String,
3158
3159 #[serde(default = "default_request_timeout")]
3161 #[validate(range(min = 1, max = 300, message = "timeout must be 1-300 seconds"))]
3162 pub timeout_seconds: i32,
3163
3164 #[serde(default = "default_max_request_size")]
3166 #[validate(range(
3167 min = 1024,
3168 max = 104857600,
3169 message = "max request size must be 1KB-100MB"
3170 ))]
3171 pub max_request_size: i64,
3172
3173 #[serde(default)]
3175 pub cors_enabled: bool,
3176
3177 #[serde(default)]
3179 pub cors_allowed_origins: Vec<String>,
3180}
3181
3182fn default_schema_registry_port() -> i32 {
3183 8081
3184}
3185
3186fn default_bind_address() -> String {
3187 "0.0.0.0".to_string()
3188}
3189
3190fn default_request_timeout() -> i32 {
3191 30
3192}
3193
3194fn default_max_request_size() -> i64 {
3195 10_485_760 }
3197
3198impl Default for SchemaRegistryServerSpec {
3199 fn default() -> Self {
3200 Self {
3201 port: 8081,
3202 bind_address: "0.0.0.0".to_string(),
3203 timeout_seconds: 30,
3204 max_request_size: 10_485_760,
3205 cors_enabled: false,
3206 cors_allowed_origins: vec![],
3207 }
3208 }
3209}
3210
3211#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3213#[serde(rename_all = "camelCase")]
3214pub struct SchemaRegistryStorageSpec {
3215 #[serde(default = "default_storage_mode")]
3217 #[validate(custom(function = "validate_storage_mode"))]
3218 pub mode: String,
3219
3220 #[serde(default = "default_schema_topic")]
3222 #[validate(length(max = 255, message = "topic name max 255 characters"))]
3223 pub topic: String,
3224
3225 #[serde(default = "default_schema_topic_replication")]
3227 #[validate(range(min = 1, max = 10, message = "replication factor must be 1-10"))]
3228 pub replication_factor: i32,
3229
3230 #[serde(default = "default_schema_topic_partitions")]
3232 #[validate(range(min = 1, max = 100, message = "partitions must be 1-100"))]
3233 pub partitions: i32,
3234
3235 #[serde(default = "default_true")]
3237 pub normalize: bool,
3238}
3239
3240fn default_storage_mode() -> String {
3241 "broker".to_string()
3242}
3243
3244fn default_schema_topic() -> String {
3245 "_schemas".to_string()
3246}
3247
3248fn default_schema_topic_replication() -> i32 {
3249 3
3250}
3251
3252fn default_schema_topic_partitions() -> i32 {
3253 1
3254}
3255
3256fn validate_storage_mode(mode: &str) -> Result<(), ValidationError> {
3257 match mode {
3258 "memory" | "broker" => Ok(()),
3259 _ => Err(ValidationError::new("invalid_storage_mode")
3260 .with_message("storage mode must be 'memory' or 'broker'".into())),
3261 }
3262}
3263
3264impl Default for SchemaRegistryStorageSpec {
3265 fn default() -> Self {
3266 Self {
3267 mode: "broker".to_string(),
3268 topic: "_schemas".to_string(),
3269 replication_factor: 3,
3270 partitions: 1,
3271 normalize: true,
3272 }
3273 }
3274}
3275
3276#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3278#[serde(rename_all = "camelCase")]
3279pub struct SchemaCompatibilitySpec {
3280 #[serde(default = "default_compatibility_level")]
3282 #[validate(custom(function = "validate_compatibility_level"))]
3283 pub default_level: String,
3284
3285 #[serde(default = "default_true")]
3287 pub allow_overrides: bool,
3288
3289 #[serde(default)]
3291 #[validate(custom(function = "validate_subject_compatibility_map"))]
3292 pub subjects: BTreeMap<String, String>,
3293}
3294
3295fn default_compatibility_level() -> String {
3296 "BACKWARD".to_string()
3297}
3298
3299fn validate_compatibility_level(level: &str) -> Result<(), ValidationError> {
3300 let valid_levels = [
3301 "BACKWARD",
3302 "BACKWARD_TRANSITIVE",
3303 "FORWARD",
3304 "FORWARD_TRANSITIVE",
3305 "FULL",
3306 "FULL_TRANSITIVE",
3307 "NONE",
3308 ];
3309 if valid_levels.contains(&level) {
3310 Ok(())
3311 } else {
3312 Err(
3313 ValidationError::new("invalid_compatibility_level").with_message(
3314 format!(
3315 "'{}' is not valid. Must be one of: {:?}",
3316 level, valid_levels
3317 )
3318 .into(),
3319 ),
3320 )
3321 }
3322}
3323
3324fn validate_subject_compatibility_map(
3325 subjects: &BTreeMap<String, String>,
3326) -> Result<(), ValidationError> {
3327 if subjects.len() > 1000 {
3328 return Err(ValidationError::new("too_many_subjects")
3329 .with_message("maximum 1000 per-subject compatibility entries".into()));
3330 }
3331 for (subject, level) in subjects {
3332 if subject.len() > 255 {
3333 return Err(ValidationError::new("subject_name_too_long")
3334 .with_message(format!("subject '{}' exceeds 255 characters", subject).into()));
3335 }
3336 validate_compatibility_level(level)?;
3337 }
3338 Ok(())
3339}
3340
3341impl Default for SchemaCompatibilitySpec {
3342 fn default() -> Self {
3343 Self {
3344 default_level: "BACKWARD".to_string(),
3345 allow_overrides: true,
3346 subjects: BTreeMap::new(),
3347 }
3348 }
3349}
3350
3351#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3353#[serde(rename_all = "camelCase")]
3354pub struct SchemaFormatSpec {
3355 #[serde(default = "default_true")]
3357 pub avro: bool,
3358
3359 #[serde(default = "default_true")]
3361 pub json_schema: bool,
3362
3363 #[serde(default = "default_true")]
3365 pub protobuf: bool,
3366
3367 #[serde(default = "default_true")]
3369 pub strict_validation: bool,
3370}
3371
3372impl Default for SchemaFormatSpec {
3373 fn default() -> Self {
3374 Self {
3375 avro: true,
3376 json_schema: true,
3377 protobuf: true,
3378 strict_validation: true,
3379 }
3380 }
3381}
3382
3383#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3385#[serde(rename_all = "camelCase")]
3386pub struct SchemaContextsSpec {
3387 #[serde(default)]
3389 pub enabled: bool,
3390
3391 #[serde(default)]
3393 #[validate(range(min = 0, max = 10000, message = "max contexts must be 0-10000"))]
3394 pub max_contexts: i32,
3395
3396 #[serde(default)]
3398 #[validate(length(max = 100, message = "maximum 100 pre-defined contexts"))]
3399 pub predefined: Vec<SchemaContextDefinition>,
3400}
3401
3402#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3404#[serde(rename_all = "camelCase")]
3405pub struct SchemaContextDefinition {
3406 #[validate(length(min = 1, max = 128, message = "context name must be 1-128 characters"))]
3408 pub name: String,
3409
3410 #[serde(default)]
3412 #[validate(length(max = 512, message = "description max 512 characters"))]
3413 pub description: Option<String>,
3414
3415 #[serde(default = "default_true")]
3417 pub active: bool,
3418}
3419
3420#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3422#[serde(rename_all = "camelCase")]
3423pub struct SchemaValidationSpec {
3424 #[serde(default)]
3426 pub enabled: bool,
3427
3428 #[serde(default = "default_max_schema_size")]
3430 #[validate(range(
3431 min = 1024,
3432 max = 10485760,
3433 message = "max schema size must be 1KB-10MB"
3434 ))]
3435 pub max_schema_size: i64,
3436
3437 #[serde(default)]
3439 #[validate(length(max = 100, message = "maximum 100 validation rules"))]
3440 pub rules: Vec<SchemaValidationRule>,
3441}
3442
3443fn default_max_schema_size() -> i64 {
3444 1_048_576 }
3446
3447impl Default for SchemaValidationSpec {
3448 fn default() -> Self {
3449 Self {
3450 enabled: false,
3451 max_schema_size: 1_048_576,
3452 rules: vec![],
3453 }
3454 }
3455}
3456
3457#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3459#[serde(rename_all = "camelCase")]
3460pub struct SchemaValidationRule {
3461 #[validate(length(min = 1, max = 128, message = "rule name must be 1-128 characters"))]
3463 pub name: String,
3464
3465 #[validate(custom(function = "validate_rule_type"))]
3467 pub rule_type: String,
3468
3469 #[validate(length(min = 1, max = 4096, message = "pattern must be 1-4096 characters"))]
3471 pub pattern: String,
3472
3473 #[serde(default)]
3475 pub subjects: Vec<String>,
3476
3477 #[serde(default)]
3479 pub schema_types: Vec<String>,
3480
3481 #[serde(default = "default_validation_level")]
3483 #[validate(custom(function = "validate_validation_level"))]
3484 pub level: String,
3485
3486 #[serde(default)]
3488 #[validate(length(max = 512, message = "description max 512 characters"))]
3489 pub description: Option<String>,
3490}
3491
3492fn validate_rule_type(rule_type: &str) -> Result<(), ValidationError> {
3493 let valid_types = [
3494 "regex",
3495 "field_exists",
3496 "field_type",
3497 "naming_convention",
3498 "documentation",
3499 ];
3500 if valid_types.contains(&rule_type) {
3501 Ok(())
3502 } else {
3503 Err(ValidationError::new("invalid_rule_type").with_message(
3504 format!(
3505 "'{}' is not valid. Must be one of: {:?}",
3506 rule_type, valid_types
3507 )
3508 .into(),
3509 ))
3510 }
3511}
3512
3513fn default_validation_level() -> String {
3514 "error".to_string()
3515}
3516
3517fn validate_validation_level(level: &str) -> Result<(), ValidationError> {
3518 match level {
3519 "error" | "warning" | "info" => Ok(()),
3520 _ => Err(ValidationError::new("invalid_validation_level")
3521 .with_message("validation level must be 'error', 'warning', or 'info'".into())),
3522 }
3523}
3524
3525#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3527#[serde(rename_all = "camelCase")]
3528pub struct SchemaRegistryAuthSpec {
3529 #[serde(default)]
3531 pub enabled: bool,
3532
3533 #[serde(default)]
3535 #[validate(custom(function = "validate_auth_method"))]
3536 pub method: Option<String>,
3537
3538 #[serde(default)]
3540 #[validate(custom(function = "validate_optional_k8s_name"))]
3541 pub credentials_secret_ref: Option<String>,
3542
3543 #[serde(default)]
3545 #[validate(nested)]
3546 pub jwt: JwtAuthSpec,
3547
3548 #[serde(default)]
3550 #[validate(length(max = 100, message = "maximum 100 users"))]
3551 pub users: Vec<SchemaRegistryUser>,
3552}
3553
3554fn validate_auth_method(method: &str) -> Result<(), ValidationError> {
3555 match method {
3556 "" | "basic" | "jwt" | "cedar" => Ok(()),
3557 _ => Err(ValidationError::new("invalid_auth_method")
3558 .with_message("auth method must be 'basic', 'jwt', or 'cedar'".into())),
3559 }
3560}
3561
3562#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3564#[serde(rename_all = "camelCase")]
3565pub struct JwtAuthSpec {
3566 #[serde(default)]
3568 pub issuer_url: Option<String>,
3569
3570 #[serde(default)]
3572 pub jwks_url: Option<String>,
3573
3574 #[serde(default)]
3576 pub audience: Option<String>,
3577
3578 #[serde(default = "default_username_claim")]
3580 pub username_claim: String,
3581
3582 #[serde(default = "default_roles_claim")]
3584 pub roles_claim: String,
3585}
3586
3587fn default_username_claim() -> String {
3588 "sub".to_string()
3589}
3590
3591fn default_roles_claim() -> String {
3592 "roles".to_string()
3593}
3594
3595#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3597#[serde(rename_all = "camelCase")]
3598pub struct SchemaRegistryUser {
3599 #[validate(length(min = 1, max = 128, message = "username must be 1-128 characters"))]
3601 pub username: String,
3602
3603 #[serde(default)]
3605 #[validate(custom(function = "validate_optional_k8s_name"))]
3606 pub password_secret_key: Option<String>,
3607
3608 #[serde(default = "default_user_role")]
3610 #[validate(custom(function = "validate_user_role"))]
3611 pub role: String,
3612
3613 #[serde(default)]
3615 pub allowed_subjects: Vec<String>,
3616}
3617
3618fn default_user_role() -> String {
3619 "reader".to_string()
3620}
3621
3622fn validate_user_role(role: &str) -> Result<(), ValidationError> {
3623 match role {
3624 "admin" | "writer" | "reader" => Ok(()),
3625 _ => Err(ValidationError::new("invalid_user_role")
3626 .with_message("role must be 'admin', 'writer', or 'reader'".into())),
3627 }
3628}
3629
3630#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3632#[serde(rename_all = "camelCase")]
3633pub struct SchemaRegistryTlsSpec {
3634 #[serde(default)]
3636 pub enabled: bool,
3637
3638 #[serde(default)]
3640 #[validate(custom(function = "validate_optional_k8s_name"))]
3641 pub cert_secret_name: Option<String>,
3642
3643 #[serde(default)]
3645 pub mtls_enabled: bool,
3646
3647 #[serde(default)]
3649 #[validate(custom(function = "validate_optional_k8s_name"))]
3650 pub ca_secret_name: Option<String>,
3651
3652 #[serde(default)]
3654 pub broker_tls_enabled: bool,
3655
3656 #[serde(default)]
3658 #[validate(custom(function = "validate_optional_k8s_name"))]
3659 pub broker_cert_secret_name: Option<String>,
3660}
3661
3662#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema, Validate)]
3664#[serde(rename_all = "camelCase")]
3665pub struct SchemaRegistryMetricsSpec {
3666 #[serde(default = "default_true")]
3668 pub enabled: bool,
3669
3670 #[serde(default = "default_schema_metrics_port")]
3672 #[validate(range(min = 1024, max = 65535, message = "port must be 1024-65535"))]
3673 pub port: i32,
3674
3675 #[serde(default = "default_metrics_path")]
3677 pub path: String,
3678
3679 #[serde(default)]
3681 pub service_monitor_enabled: bool,
3682
3683 #[serde(default = "default_scrape_interval")]
3685 #[validate(custom(function = "validate_duration"))]
3686 pub scrape_interval: String,
3687}
3688
3689fn default_schema_metrics_port() -> i32 {
3690 9090
3691}
3692
3693fn default_metrics_path() -> String {
3694 "/metrics".to_string()
3695}
3696
3697impl Default for SchemaRegistryMetricsSpec {
3698 fn default() -> Self {
3699 Self {
3700 enabled: true,
3701 port: 9090,
3702 path: "/metrics".to_string(),
3703 service_monitor_enabled: false,
3704 scrape_interval: "30s".to_string(),
3705 }
3706 }
3707}
3708
3709#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, Validate)]
3711#[serde(rename_all = "camelCase")]
3712pub struct ExternalRegistrySpec {
3713 #[serde(default)]
3715 pub enabled: bool,
3716
3717 #[serde(default)]
3719 #[validate(custom(function = "validate_external_registry_type"))]
3720 pub registry_type: Option<String>,
3721
3722 #[serde(default)]
3724 pub confluent_url: Option<String>,
3725
3726 #[serde(default)]
3728 pub glue_registry_arn: Option<String>,
3729
3730 #[serde(default)]
3732 pub aws_region: Option<String>,
3733
3734 #[serde(default)]
3736 #[validate(custom(function = "validate_sync_mode"))]
3737 pub sync_mode: Option<String>,
3738
3739 #[serde(default)]
3741 pub sync_subjects: Vec<String>,
3742
3743 #[serde(default = "default_sync_interval")]
3745 #[validate(range(
3746 min = 10,
3747 max = 86400,
3748 message = "sync interval must be 10-86400 seconds"
3749 ))]
3750 pub sync_interval_seconds: i32,
3751
3752 #[serde(default)]
3754 #[validate(custom(function = "validate_optional_k8s_name"))]
3755 pub credentials_secret_ref: Option<String>,
3756}
3757
3758fn validate_external_registry_type(reg_type: &str) -> Result<(), ValidationError> {
3759 match reg_type {
3760 "" | "confluent" | "glue" => Ok(()),
3761 _ => Err(ValidationError::new("invalid_external_registry_type")
3762 .with_message("registry type must be 'confluent' or 'glue'".into())),
3763 }
3764}
3765
3766fn validate_sync_mode(mode: &str) -> Result<(), ValidationError> {
3767 match mode {
3768 "" | "mirror" | "push" | "bidirectional" => Ok(()),
3769 _ => Err(ValidationError::new("invalid_sync_mode")
3770 .with_message("sync mode must be 'mirror', 'push', or 'bidirectional'".into())),
3771 }
3772}
3773
3774fn default_sync_interval() -> i32 {
3775 300 }
3777
3778#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema)]
3780#[serde(rename_all = "camelCase")]
3781pub struct RivvenSchemaRegistryStatus {
3782 #[serde(default)]
3784 pub phase: SchemaRegistryPhase,
3785
3786 pub replicas: i32,
3788
3789 pub ready_replicas: i32,
3791
3792 pub schemas_registered: i32,
3794
3795 pub subjects_count: i32,
3797
3798 pub contexts_count: i32,
3800
3801 pub observed_generation: i64,
3803
3804 #[serde(default)]
3806 pub conditions: Vec<SchemaRegistryCondition>,
3807
3808 #[serde(default)]
3810 pub endpoints: Vec<String>,
3811
3812 pub storage_status: Option<String>,
3814
3815 pub external_sync_status: Option<String>,
3817
3818 pub last_sync_time: Option<String>,
3820
3821 pub last_updated: Option<String>,
3823
3824 pub message: Option<String>,
3826}
3827
3828#[derive(Debug, Clone, Default, Deserialize, Serialize, JsonSchema, PartialEq, Eq)]
3830pub enum SchemaRegistryPhase {
3831 #[default]
3833 Pending,
3834 Provisioning,
3836 Running,
3838 Updating,
3840 Degraded,
3842 Failed,
3844 Terminating,
3846}
3847
3848#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
3850#[serde(rename_all = "camelCase")]
3851pub struct SchemaRegistryCondition {
3852 #[serde(rename = "type")]
3854 pub condition_type: String,
3855
3856 pub status: String,
3858
3859 pub reason: Option<String>,
3861
3862 pub message: Option<String>,
3864
3865 pub last_transition_time: Option<String>,
3867}
3868
3869impl RivvenSchemaRegistrySpec {
3870 pub fn get_image(&self) -> String {
3872 if let Some(ref image) = self.image {
3873 image.clone()
3874 } else {
3875 format!("ghcr.io/hupe1980/rivven-schema:{}", self.version)
3876 }
3877 }
3878
3879 pub fn get_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3881 let mut labels = BTreeMap::new();
3882 labels.insert(
3883 "app.kubernetes.io/name".to_string(),
3884 "rivven-schema-registry".to_string(),
3885 );
3886 labels.insert(
3887 "app.kubernetes.io/instance".to_string(),
3888 registry_name.to_string(),
3889 );
3890 labels.insert(
3891 "app.kubernetes.io/component".to_string(),
3892 "schema-registry".to_string(),
3893 );
3894 labels.insert(
3895 "app.kubernetes.io/managed-by".to_string(),
3896 "rivven-operator".to_string(),
3897 );
3898 labels.insert(
3899 "app.kubernetes.io/version".to_string(),
3900 self.version.clone(),
3901 );
3902 labels
3903 }
3904
3905 pub fn get_selector_labels(&self, registry_name: &str) -> BTreeMap<String, String> {
3907 let mut labels = BTreeMap::new();
3908 labels.insert(
3909 "app.kubernetes.io/name".to_string(),
3910 "rivven-schema-registry".to_string(),
3911 );
3912 labels.insert(
3913 "app.kubernetes.io/instance".to_string(),
3914 registry_name.to_string(),
3915 );
3916 labels
3917 }
3918}
3919
3920#[cfg(test)]
3921mod tests {
3922 use super::*;
3923
3924 #[test]
3925 fn test_default_spec() {
3926 let spec = RivvenClusterSpec {
3927 replicas: 3,
3928 version: "0.0.1".to_string(),
3929 image: None,
3930 image_pull_policy: "IfNotPresent".to_string(),
3931 image_pull_secrets: vec![],
3932 storage: StorageSpec::default(),
3933 resources: None,
3934 config: BrokerConfig::default(),
3935 tls: TlsSpec::default(),
3936 metrics: MetricsSpec::default(),
3937 affinity: None,
3938 node_selector: BTreeMap::new(),
3939 tolerations: vec![],
3940 pod_disruption_budget: PdbSpec::default(),
3941 service_account: None,
3942 pod_annotations: BTreeMap::new(),
3943 pod_labels: BTreeMap::new(),
3944 env: vec![],
3945 liveness_probe: ProbeSpec::default(),
3946 readiness_probe: ProbeSpec::default(),
3947 security_context: None,
3948 container_security_context: None,
3949 };
3950
3951 assert_eq!(spec.replicas, 3);
3952 assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven:0.0.1");
3953 }
3954
3955 #[test]
3956 fn test_get_labels() {
3957 let spec = RivvenClusterSpec {
3958 replicas: 3,
3959 version: "0.0.1".to_string(),
3960 image: None,
3961 image_pull_policy: "IfNotPresent".to_string(),
3962 image_pull_secrets: vec![],
3963 storage: StorageSpec::default(),
3964 resources: None,
3965 config: BrokerConfig::default(),
3966 tls: TlsSpec::default(),
3967 metrics: MetricsSpec::default(),
3968 affinity: None,
3969 node_selector: BTreeMap::new(),
3970 tolerations: vec![],
3971 pod_disruption_budget: PdbSpec::default(),
3972 service_account: None,
3973 pod_annotations: BTreeMap::new(),
3974 pod_labels: BTreeMap::new(),
3975 env: vec![],
3976 liveness_probe: ProbeSpec::default(),
3977 readiness_probe: ProbeSpec::default(),
3978 security_context: None,
3979 container_security_context: None,
3980 };
3981
3982 let labels = spec.get_labels("my-cluster");
3983 assert_eq!(
3984 labels.get("app.kubernetes.io/name"),
3985 Some(&"rivven".to_string())
3986 );
3987 assert_eq!(
3988 labels.get("app.kubernetes.io/instance"),
3989 Some(&"my-cluster".to_string())
3990 );
3991 }
3992
3993 #[test]
3994 fn test_custom_image() {
3995 let spec = RivvenClusterSpec {
3996 replicas: 1,
3997 version: "0.0.1".to_string(),
3998 image: Some("my-registry/rivven:custom".to_string()),
3999 image_pull_policy: "Always".to_string(),
4000 image_pull_secrets: vec![],
4001 storage: StorageSpec::default(),
4002 resources: None,
4003 config: BrokerConfig::default(),
4004 tls: TlsSpec::default(),
4005 metrics: MetricsSpec::default(),
4006 affinity: None,
4007 node_selector: BTreeMap::new(),
4008 tolerations: vec![],
4009 pod_disruption_budget: PdbSpec::default(),
4010 service_account: None,
4011 pod_annotations: BTreeMap::new(),
4012 pod_labels: BTreeMap::new(),
4013 env: vec![],
4014 liveness_probe: ProbeSpec::default(),
4015 readiness_probe: ProbeSpec::default(),
4016 security_context: None,
4017 container_security_context: None,
4018 };
4019
4020 assert_eq!(spec.get_image(), "my-registry/rivven:custom");
4021 }
4022
4023 #[test]
4024 fn test_cluster_phase_default() {
4025 let phase = ClusterPhase::default();
4026 assert_eq!(phase, ClusterPhase::Pending);
4027 }
4028
4029 #[test]
4030 fn test_storage_spec_default() {
4031 let storage = StorageSpec::default();
4032 assert_eq!(storage.size, "10Gi");
4033 assert!(storage.storage_class_name.is_none());
4034 }
4035
4036 #[test]
4037 fn test_broker_config_defaults() {
4038 let config = BrokerConfig::default();
4039 assert_eq!(config.default_partitions, 3);
4040 assert_eq!(config.default_replication_factor, 2);
4041 assert!(config.auto_create_topics);
4042 }
4043
4044 #[test]
4045 fn test_probe_spec_defaults() {
4046 let probe = ProbeSpec::default();
4047 assert!(probe.enabled);
4048 assert_eq!(probe.initial_delay_seconds, 30);
4049 assert_eq!(probe.period_seconds, 10);
4050 }
4051
4052 #[test]
4053 fn test_validate_quantity_valid() {
4054 assert!(validate_quantity("10Gi").is_ok());
4055 assert!(validate_quantity("100Mi").is_ok());
4056 assert!(validate_quantity("1Ti").is_ok());
4057 assert!(validate_quantity("500").is_ok());
4058 assert!(validate_quantity("1.5Gi").is_ok());
4059 }
4060
4061 #[test]
4062 fn test_validate_quantity_invalid() {
4063 assert!(validate_quantity("10GB").is_err()); assert!(validate_quantity("abc").is_err()); assert!(validate_quantity("-10Gi").is_err()); assert!(validate_quantity("").is_err()); }
4068
4069 #[test]
4070 fn test_validate_k8s_name_valid() {
4071 assert!(validate_k8s_name("my-cluster").is_ok());
4072 assert!(validate_k8s_name("cluster123").is_ok());
4073 assert!(validate_k8s_name("a").is_ok());
4074 }
4075
4076 #[test]
4077 fn test_validate_k8s_name_invalid() {
4078 assert!(validate_k8s_name("My-Cluster").is_err()); assert!(validate_k8s_name("-cluster").is_err()); assert!(validate_k8s_name("cluster-").is_err()); assert!(validate_k8s_name("cluster_name").is_err()); }
4083
4084 #[test]
4085 fn test_validate_compression_type() {
4086 assert!(validate_compression_type("lz4").is_ok());
4087 assert!(validate_compression_type("zstd").is_ok());
4088 assert!(validate_compression_type("none").is_ok());
4089 assert!(validate_compression_type("invalid").is_err());
4090 }
4091
4092 #[test]
4093 fn test_validate_segment_size() {
4094 assert!(validate_segment_size(1_048_576).is_ok()); assert!(validate_segment_size(10_737_418_240).is_ok()); assert!(validate_segment_size(1_073_741_824).is_ok()); assert!(validate_segment_size(1_000).is_err()); assert!(validate_segment_size(20_000_000_000).is_err()); }
4100
4101 #[test]
4102 fn test_validate_message_size() {
4103 assert!(validate_message_size(1_024).is_ok()); assert!(validate_message_size(104_857_600).is_ok()); assert!(validate_message_size(1_048_576).is_ok()); assert!(validate_message_size(100).is_err()); assert!(validate_message_size(200_000_000).is_err()); }
4109
4110 #[test]
4111 fn test_validate_pull_policy() {
4112 assert!(validate_pull_policy("Always").is_ok());
4113 assert!(validate_pull_policy("IfNotPresent").is_ok());
4114 assert!(validate_pull_policy("Never").is_ok());
4115 assert!(validate_pull_policy("always").is_err()); assert!(validate_pull_policy("Invalid").is_err());
4117 }
4118
4119 #[test]
4120 fn test_validate_duration() {
4121 assert!(validate_duration("30s").is_ok());
4122 assert!(validate_duration("1m").is_ok());
4123 assert!(validate_duration("5m30s").is_ok());
4124 assert!(validate_duration("1h").is_ok());
4125 assert!(validate_duration("invalid").is_err());
4126 assert!(validate_duration("30").is_err()); }
4128
4129 #[test]
4130 fn test_validate_access_modes() {
4131 assert!(validate_access_modes(&["ReadWriteOnce".to_string()]).is_ok());
4132 assert!(
4133 validate_access_modes(&["ReadWriteOnce".to_string(), "ReadOnlyMany".to_string()])
4134 .is_ok()
4135 );
4136 assert!(validate_access_modes(&["Invalid".to_string()]).is_err());
4137 }
4138
4139 #[test]
4141 fn test_connect_spec_defaults() {
4142 let spec = RivvenConnectSpec {
4143 cluster_ref: ClusterReference {
4144 name: "my-cluster".to_string(),
4145 namespace: None,
4146 },
4147 replicas: 1,
4148 version: "0.0.1".to_string(),
4149 image: None,
4150 image_pull_policy: "IfNotPresent".to_string(),
4151 image_pull_secrets: vec![],
4152 resources: None,
4153 config: ConnectConfigSpec::default(),
4154 sources: vec![],
4155 sinks: vec![],
4156 settings: GlobalConnectSettings::default(),
4157 tls: ConnectTlsSpec::default(),
4158 pod_annotations: BTreeMap::new(),
4159 pod_labels: BTreeMap::new(),
4160 env: vec![],
4161 node_selector: BTreeMap::new(),
4162 tolerations: vec![],
4163 affinity: None,
4164 service_account: None,
4165 security_context: None,
4166 container_security_context: None,
4167 };
4168 assert_eq!(spec.replicas, 1);
4169 }
4170
4171 #[test]
4172 fn test_connect_phase_default() {
4173 let phase = ConnectPhase::default();
4174 assert_eq!(phase, ConnectPhase::Pending);
4175 }
4176
4177 #[test]
4178 fn test_validate_connector_type() {
4179 assert!(validate_connector_type("postgres-cdc").is_ok());
4180 assert!(validate_connector_type("mysql-cdc").is_ok());
4181 assert!(validate_connector_type("http").is_ok());
4182 assert!(validate_connector_type("stdout").is_ok());
4183 assert!(validate_connector_type("s3").is_ok());
4184 assert!(validate_connector_type("datagen").is_ok());
4185 assert!(validate_connector_type("custom-connector").is_ok());
4186 }
4187
4188 #[test]
4189 fn test_validate_start_offset() {
4190 assert!(validate_start_offset("earliest").is_ok());
4191 assert!(validate_start_offset("latest").is_ok());
4192 assert!(validate_start_offset("2024-01-01T00:00:00Z").is_ok());
4193 assert!(validate_start_offset("2024-06-15T12:30:00+02:00").is_ok());
4194 assert!(validate_start_offset("invalid").is_err());
4195 assert!(validate_start_offset("not-a-dateT:").is_err());
4197 assert!(validate_start_offset("xTy:z").is_err());
4198 assert!(validate_start_offset("2024-13-01T00:00:00Z").is_err()); assert!(validate_start_offset("2024-01-32T00:00:00Z").is_err()); }
4201
4202 #[test]
4203 fn test_validate_image_valid() {
4204 assert!(validate_image("nginx").is_ok());
4205 assert!(validate_image("nginx:latest").is_ok());
4206 assert!(validate_image("ghcr.io/hupe1980/rivven:0.0.1").is_ok());
4207 assert!(validate_image("my-registry.io:5000/image:tag").is_ok());
4208 assert!(validate_image("localhost:5000/myimage").is_ok());
4209 assert!(validate_image("").is_ok()); }
4211
4212 #[test]
4213 fn test_validate_image_invalid() {
4214 assert!(validate_image("/absolute/path").is_err()); assert!(validate_image("-invalid").is_err()); assert!(validate_image("image..path").is_err()); let long_name = "a".repeat(300);
4219 assert!(validate_image(&long_name).is_err());
4220 }
4221
4222 #[test]
4223 fn test_validate_node_selector() {
4224 let mut selectors = BTreeMap::new();
4225 selectors.insert("node-type".to_string(), "compute".to_string());
4226 assert!(validate_node_selector(&selectors).is_ok());
4227
4228 let mut many = BTreeMap::new();
4230 for i in 0..25 {
4231 many.insert(format!("key-{}", i), "value".to_string());
4232 }
4233 assert!(validate_node_selector(&many).is_err());
4234 }
4235
4236 #[test]
4237 fn test_validate_annotations() {
4238 let mut annotations = BTreeMap::new();
4239 annotations.insert("prometheus.io/scrape".to_string(), "true".to_string());
4240 assert!(validate_annotations(&annotations).is_ok());
4241
4242 let mut many = BTreeMap::new();
4244 for i in 0..55 {
4245 many.insert(format!("annotation-{}", i), "value".to_string());
4246 }
4247 assert!(validate_annotations(&many).is_err());
4248 }
4249
4250 #[test]
4251 fn test_validate_labels() {
4252 let mut labels = BTreeMap::new();
4253 labels.insert("team".to_string(), "platform".to_string());
4254 assert!(validate_labels(&labels).is_ok());
4255
4256 let mut reserved = BTreeMap::new();
4258 reserved.insert("app.kubernetes.io/custom".to_string(), "value".to_string());
4259 assert!(validate_labels(&reserved).is_err());
4260 }
4261
4262 #[test]
4263 fn test_validate_raw_config() {
4264 let mut config = BTreeMap::new();
4265 config.insert("custom.setting".to_string(), "value".to_string());
4266 assert!(validate_raw_config(&config).is_ok());
4267
4268 let mut forbidden = BTreeMap::new();
4270 forbidden.insert("command".to_string(), "/bin/sh".to_string());
4271 assert!(validate_raw_config(&forbidden).is_err());
4272
4273 let mut many = BTreeMap::new();
4275 for i in 0..55 {
4276 many.insert(format!("config-{}", i), "value".to_string());
4277 }
4278 assert!(validate_raw_config(&many).is_err());
4279 }
4280
4281 #[test]
4282 fn test_validate_int_or_percent() {
4283 assert!(validate_optional_int_or_percent("1").is_ok());
4284 assert!(validate_optional_int_or_percent("25%").is_ok());
4285 assert!(validate_optional_int_or_percent("100%").is_ok());
4286 assert!(validate_optional_int_or_percent("").is_ok()); assert!(validate_optional_int_or_percent("abc").is_err());
4288 assert!(validate_optional_int_or_percent("25%%").is_err());
4289 }
4290
4291 #[test]
4292 fn test_tls_spec_default() {
4293 let tls = TlsSpec::default();
4294 assert!(!tls.enabled);
4295 assert!(tls.cert_secret_name.is_none());
4296 assert!(!tls.mtls_enabled);
4297 }
4298
4299 #[test]
4300 fn test_metrics_spec_default() {
4301 let metrics = MetricsSpec::default();
4302 assert!(metrics.enabled);
4303 assert_eq!(metrics.port, 9090);
4304 }
4305
4306 #[test]
4307 fn test_pdb_spec_default() {
4308 let pdb = PdbSpec::default();
4309 assert!(pdb.enabled);
4310 assert!(pdb.min_available.is_none());
4311 assert_eq!(pdb.max_unavailable, Some("1".to_string()));
4312 }
4313
4314 #[test]
4315 fn test_service_monitor_labels() {
4316 let mut labels = BTreeMap::new();
4317 labels.insert("release".to_string(), "prometheus".to_string());
4318 assert!(validate_service_monitor_labels(&labels).is_ok());
4319
4320 let mut many = BTreeMap::new();
4322 for i in 0..15 {
4323 many.insert(format!("label-{}", i), "value".to_string());
4324 }
4325 assert!(validate_service_monitor_labels(&many).is_err());
4326 }
4327
4328 #[test]
4329 fn test_cluster_condition_time_format() {
4330 let condition = ClusterCondition {
4331 condition_type: "Ready".to_string(),
4332 status: "True".to_string(),
4333 last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4334 reason: Some("AllReplicasReady".to_string()),
4335 message: Some("All replicas are ready".to_string()),
4336 };
4337 assert!(condition.last_transition_time.unwrap().contains('T'));
4338 }
4339
4340 #[test]
4345 fn test_schema_registry_phase_default() {
4346 let phase = SchemaRegistryPhase::default();
4347 assert_eq!(phase, SchemaRegistryPhase::Pending);
4348 }
4349
4350 #[test]
4351 fn test_schema_registry_server_spec_default() {
4352 let spec = SchemaRegistryServerSpec::default();
4353 assert_eq!(spec.port, 8081);
4354 assert_eq!(spec.bind_address, "0.0.0.0");
4355 assert_eq!(spec.timeout_seconds, 30);
4356 assert_eq!(spec.max_request_size, 10_485_760);
4357 assert!(!spec.cors_enabled);
4358 }
4359
4360 #[test]
4361 fn test_schema_registry_storage_spec_default() {
4362 let spec = SchemaRegistryStorageSpec::default();
4363 assert_eq!(spec.mode, "broker");
4364 assert_eq!(spec.topic, "_schemas");
4365 assert_eq!(spec.replication_factor, 3);
4366 assert_eq!(spec.partitions, 1);
4367 assert!(spec.normalize);
4368 }
4369
4370 #[test]
4371 fn test_validate_storage_mode() {
4372 assert!(validate_storage_mode("memory").is_ok());
4373 assert!(validate_storage_mode("broker").is_ok());
4374 assert!(validate_storage_mode("invalid").is_err());
4375 }
4376
4377 #[test]
4378 fn test_schema_compatibility_spec_default() {
4379 let spec = SchemaCompatibilitySpec::default();
4380 assert_eq!(spec.default_level, "BACKWARD");
4381 assert!(spec.allow_overrides);
4382 assert!(spec.subjects.is_empty());
4383 }
4384
4385 #[test]
4386 fn test_validate_compatibility_level() {
4387 assert!(validate_compatibility_level("BACKWARD").is_ok());
4388 assert!(validate_compatibility_level("BACKWARD_TRANSITIVE").is_ok());
4389 assert!(validate_compatibility_level("FORWARD").is_ok());
4390 assert!(validate_compatibility_level("FORWARD_TRANSITIVE").is_ok());
4391 assert!(validate_compatibility_level("FULL").is_ok());
4392 assert!(validate_compatibility_level("FULL_TRANSITIVE").is_ok());
4393 assert!(validate_compatibility_level("NONE").is_ok());
4394 assert!(validate_compatibility_level("invalid").is_err());
4395 assert!(validate_compatibility_level("backward").is_err()); }
4397
4398 #[test]
4399 fn test_schema_format_spec_default() {
4400 let spec = SchemaFormatSpec::default();
4401 assert!(spec.avro);
4402 assert!(spec.json_schema);
4403 assert!(spec.protobuf);
4404 assert!(spec.strict_validation);
4405 }
4406
4407 #[test]
4408 fn test_schema_contexts_spec_default() {
4409 let spec = SchemaContextsSpec::default();
4410 assert!(!spec.enabled);
4411 assert_eq!(spec.max_contexts, 0);
4412 assert!(spec.predefined.is_empty());
4413 }
4414
4415 #[test]
4416 fn test_schema_validation_spec_default() {
4417 let spec = SchemaValidationSpec::default();
4418 assert!(!spec.enabled);
4419 assert_eq!(spec.max_schema_size, 1_048_576);
4420 assert!(spec.rules.is_empty());
4421 }
4422
4423 #[test]
4424 fn test_validate_rule_type() {
4425 assert!(validate_rule_type("regex").is_ok());
4426 assert!(validate_rule_type("field_exists").is_ok());
4427 assert!(validate_rule_type("field_type").is_ok());
4428 assert!(validate_rule_type("naming_convention").is_ok());
4429 assert!(validate_rule_type("documentation").is_ok());
4430 assert!(validate_rule_type("invalid").is_err());
4431 }
4432
4433 #[test]
4434 fn test_validate_validation_level() {
4435 assert!(validate_validation_level("error").is_ok());
4436 assert!(validate_validation_level("warning").is_ok());
4437 assert!(validate_validation_level("info").is_ok());
4438 assert!(validate_validation_level("invalid").is_err());
4439 }
4440
4441 #[test]
4442 fn test_schema_registry_auth_spec_default() {
4443 let spec = SchemaRegistryAuthSpec::default();
4444 assert!(!spec.enabled);
4445 assert!(spec.method.is_none());
4446 assert!(spec.credentials_secret_ref.is_none());
4447 }
4448
4449 #[test]
4450 fn test_validate_auth_method() {
4451 assert!(validate_auth_method("basic").is_ok());
4452 assert!(validate_auth_method("jwt").is_ok());
4453 assert!(validate_auth_method("cedar").is_ok());
4454 assert!(validate_auth_method("").is_ok());
4455 assert!(validate_auth_method("invalid").is_err());
4456 }
4457
4458 #[test]
4459 fn test_jwt_auth_spec_default() {
4460 let spec = JwtAuthSpec::default();
4461 assert!(spec.issuer_url.is_none());
4462 assert!(spec.jwks_url.is_none());
4463 assert!(spec.username_claim.is_empty());
4465 assert!(spec.roles_claim.is_empty());
4466 }
4467
4468 #[test]
4469 fn test_validate_user_role() {
4470 assert!(validate_user_role("admin").is_ok());
4471 assert!(validate_user_role("writer").is_ok());
4472 assert!(validate_user_role("reader").is_ok());
4473 assert!(validate_user_role("invalid").is_err());
4474 }
4475
4476 #[test]
4477 fn test_schema_registry_tls_spec_default() {
4478 let spec = SchemaRegistryTlsSpec::default();
4479 assert!(!spec.enabled);
4480 assert!(spec.cert_secret_name.is_none());
4481 assert!(!spec.mtls_enabled);
4482 assert!(!spec.broker_tls_enabled);
4483 }
4484
4485 #[test]
4486 fn test_schema_registry_metrics_spec_default() {
4487 let spec = SchemaRegistryMetricsSpec::default();
4488 assert!(spec.enabled);
4489 assert_eq!(spec.port, 9090);
4490 assert_eq!(spec.path, "/metrics");
4491 assert!(!spec.service_monitor_enabled);
4492 assert_eq!(spec.scrape_interval, "30s");
4493 }
4494
4495 #[test]
4496 fn test_external_registry_spec_default() {
4497 let spec = ExternalRegistrySpec::default();
4498 assert!(!spec.enabled);
4499 assert!(spec.registry_type.is_none());
4500 assert!(spec.confluent_url.is_none());
4501 assert!(spec.glue_registry_arn.is_none());
4502 assert_eq!(spec.sync_interval_seconds, 0);
4504 }
4505
4506 #[test]
4507 fn test_validate_external_registry_type() {
4508 assert!(validate_external_registry_type("confluent").is_ok());
4509 assert!(validate_external_registry_type("glue").is_ok());
4510 assert!(validate_external_registry_type("").is_ok());
4511 assert!(validate_external_registry_type("invalid").is_err());
4512 }
4513
4514 #[test]
4515 fn test_validate_sync_mode() {
4516 assert!(validate_sync_mode("mirror").is_ok());
4517 assert!(validate_sync_mode("push").is_ok());
4518 assert!(validate_sync_mode("bidirectional").is_ok());
4519 assert!(validate_sync_mode("").is_ok());
4520 assert!(validate_sync_mode("invalid").is_err());
4521 }
4522
4523 #[test]
4524 fn test_schema_registry_spec_get_image_default() {
4525 let spec = RivvenSchemaRegistrySpec {
4526 cluster_ref: ClusterReference {
4527 name: "test".to_string(),
4528 namespace: None,
4529 },
4530 replicas: 1,
4531 version: "0.0.1".to_string(),
4532 image: None,
4533 image_pull_policy: "IfNotPresent".to_string(),
4534 image_pull_secrets: vec![],
4535 resources: None,
4536 server: SchemaRegistryServerSpec::default(),
4537 storage: SchemaRegistryStorageSpec::default(),
4538 compatibility: SchemaCompatibilitySpec::default(),
4539 schemas: SchemaFormatSpec::default(),
4540 contexts: SchemaContextsSpec::default(),
4541 validation: SchemaValidationSpec::default(),
4542 auth: SchemaRegistryAuthSpec::default(),
4543 tls: SchemaRegistryTlsSpec::default(),
4544 metrics: SchemaRegistryMetricsSpec::default(),
4545 external: ExternalRegistrySpec::default(),
4546 pod_annotations: BTreeMap::new(),
4547 pod_labels: BTreeMap::new(),
4548 env: vec![],
4549 node_selector: BTreeMap::new(),
4550 tolerations: vec![],
4551 affinity: None,
4552 service_account: None,
4553 security_context: None,
4554 container_security_context: None,
4555 liveness_probe: ProbeSpec::default(),
4556 readiness_probe: ProbeSpec::default(),
4557 pod_disruption_budget: PdbSpec::default(),
4558 };
4559 assert_eq!(spec.get_image(), "ghcr.io/hupe1980/rivven-schema:0.0.1");
4560 }
4561
4562 #[test]
4563 fn test_schema_registry_spec_get_image_custom() {
4564 let spec = RivvenSchemaRegistrySpec {
4565 cluster_ref: ClusterReference {
4566 name: "test".to_string(),
4567 namespace: None,
4568 },
4569 replicas: 1,
4570 version: "0.0.1".to_string(),
4571 image: Some("custom/schema-registry:latest".to_string()),
4572 image_pull_policy: "IfNotPresent".to_string(),
4573 image_pull_secrets: vec![],
4574 resources: None,
4575 server: SchemaRegistryServerSpec::default(),
4576 storage: SchemaRegistryStorageSpec::default(),
4577 compatibility: SchemaCompatibilitySpec::default(),
4578 schemas: SchemaFormatSpec::default(),
4579 contexts: SchemaContextsSpec::default(),
4580 validation: SchemaValidationSpec::default(),
4581 auth: SchemaRegistryAuthSpec::default(),
4582 tls: SchemaRegistryTlsSpec::default(),
4583 metrics: SchemaRegistryMetricsSpec::default(),
4584 external: ExternalRegistrySpec::default(),
4585 pod_annotations: BTreeMap::new(),
4586 pod_labels: BTreeMap::new(),
4587 env: vec![],
4588 node_selector: BTreeMap::new(),
4589 tolerations: vec![],
4590 affinity: None,
4591 service_account: None,
4592 security_context: None,
4593 container_security_context: None,
4594 liveness_probe: ProbeSpec::default(),
4595 readiness_probe: ProbeSpec::default(),
4596 pod_disruption_budget: PdbSpec::default(),
4597 };
4598 assert_eq!(spec.get_image(), "custom/schema-registry:latest");
4599 }
4600
4601 #[test]
4602 fn test_schema_registry_spec_get_labels() {
4603 let spec = RivvenSchemaRegistrySpec {
4604 cluster_ref: ClusterReference {
4605 name: "test".to_string(),
4606 namespace: None,
4607 },
4608 replicas: 1,
4609 version: "0.0.1".to_string(),
4610 image: None,
4611 image_pull_policy: "IfNotPresent".to_string(),
4612 image_pull_secrets: vec![],
4613 resources: None,
4614 server: SchemaRegistryServerSpec::default(),
4615 storage: SchemaRegistryStorageSpec::default(),
4616 compatibility: SchemaCompatibilitySpec::default(),
4617 schemas: SchemaFormatSpec::default(),
4618 contexts: SchemaContextsSpec::default(),
4619 validation: SchemaValidationSpec::default(),
4620 auth: SchemaRegistryAuthSpec::default(),
4621 tls: SchemaRegistryTlsSpec::default(),
4622 metrics: SchemaRegistryMetricsSpec::default(),
4623 external: ExternalRegistrySpec::default(),
4624 pod_annotations: BTreeMap::new(),
4625 pod_labels: BTreeMap::new(),
4626 env: vec![],
4627 node_selector: BTreeMap::new(),
4628 tolerations: vec![],
4629 affinity: None,
4630 service_account: None,
4631 security_context: None,
4632 container_security_context: None,
4633 liveness_probe: ProbeSpec::default(),
4634 readiness_probe: ProbeSpec::default(),
4635 pod_disruption_budget: PdbSpec::default(),
4636 };
4637
4638 let labels = spec.get_labels("my-registry");
4639 assert_eq!(
4640 labels.get("app.kubernetes.io/name"),
4641 Some(&"rivven-schema-registry".to_string())
4642 );
4643 assert_eq!(
4644 labels.get("app.kubernetes.io/instance"),
4645 Some(&"my-registry".to_string())
4646 );
4647 assert_eq!(
4648 labels.get("app.kubernetes.io/component"),
4649 Some(&"schema-registry".to_string())
4650 );
4651 }
4652
4653 #[test]
4654 fn test_schema_registry_condition_time_format() {
4655 let condition = SchemaRegistryCondition {
4656 condition_type: "Ready".to_string(),
4657 status: "True".to_string(),
4658 last_transition_time: Some(chrono::Utc::now().to_rfc3339()),
4659 reason: Some("AllReplicasReady".to_string()),
4660 message: Some("All replicas are ready".to_string()),
4661 };
4662 assert!(condition.last_transition_time.unwrap().contains('T'));
4663 }
4664
4665 #[test]
4666 fn test_validate_subject_compatibility_map() {
4667 let mut subjects = BTreeMap::new();
4668 subjects.insert("orders-value".to_string(), "BACKWARD".to_string());
4669 subjects.insert("users-value".to_string(), "FULL".to_string());
4670 assert!(validate_subject_compatibility_map(&subjects).is_ok());
4671
4672 let mut invalid = BTreeMap::new();
4674 invalid.insert("test".to_string(), "invalid".to_string());
4675 assert!(validate_subject_compatibility_map(&invalid).is_err());
4676
4677 let mut long_name = BTreeMap::new();
4679 long_name.insert("a".repeat(300), "BACKWARD".to_string());
4680 assert!(validate_subject_compatibility_map(&long_name).is_err());
4681 }
4682
4683 #[test]
4688 fn test_snapshot_cdc_config_spec_serde_defaults() {
4689 let json = r#"{}"#;
4691 let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4692 assert_eq!(config.batch_size, 10_000);
4693 assert_eq!(config.parallel_tables, 4);
4694 assert_eq!(config.query_timeout_secs, 300);
4695 assert_eq!(config.throttle_delay_ms, 0);
4696 assert_eq!(config.max_retries, 3);
4697 assert!(config.include_tables.is_empty());
4698 assert!(config.exclude_tables.is_empty());
4699 }
4700
4701 #[test]
4702 fn test_snapshot_cdc_config_spec_validation() {
4703 let json = r#"{"batchSize": 50}"#;
4704 let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4705 assert!(config.validate().is_err());
4706
4707 let json = r#"{"batchSize": 5000, "parallelTables": 50}"#;
4708 let config: SnapshotCdcConfigSpec = serde_json::from_str(json).unwrap();
4709 assert!(config.validate().is_err());
4710 }
4711
4712 #[test]
4713 fn test_incremental_snapshot_spec_serde_defaults() {
4714 let json = r#"{}"#;
4715 let config: IncrementalSnapshotSpec = serde_json::from_str(json).unwrap();
4716 assert!(!config.enabled);
4717 assert_eq!(config.chunk_size, 1024);
4718 assert!(config.watermark_strategy.is_empty());
4719 assert_eq!(config.max_concurrent_chunks, 1);
4720 }
4721
4722 #[test]
4723 fn test_validate_watermark_strategy() {
4724 assert!(validate_watermark_strategy("insert").is_ok());
4725 assert!(validate_watermark_strategy("update_and_insert").is_ok());
4726 assert!(validate_watermark_strategy("").is_ok());
4727 assert!(validate_watermark_strategy("invalid").is_err());
4728 }
4729
4730 #[test]
4731 fn test_heartbeat_cdc_spec_serde_defaults() {
4732 let json = r#"{}"#;
4733 let config: HeartbeatCdcSpec = serde_json::from_str(json).unwrap();
4734 assert!(!config.enabled);
4735 assert_eq!(config.interval_secs, 10);
4736 assert_eq!(config.max_lag_secs, 300);
4737 assert!(!config.emit_events);
4738 }
4739
4740 #[test]
4741 fn test_deduplication_cdc_spec_serde_defaults() {
4742 let json = r#"{}"#;
4743 let config: DeduplicationCdcSpec = serde_json::from_str(json).unwrap();
4744 assert!(!config.enabled);
4745 assert_eq!(config.bloom_expected_insertions, 100_000);
4746 assert_eq!(config.bloom_fpp, 0.01);
4747 assert_eq!(config.lru_size, 10_000);
4748 assert_eq!(config.window_secs, 3600);
4749 }
4750
4751 #[test]
4752 fn test_transaction_topic_spec_serde_defaults() {
4753 let json = r#"{}"#;
4754 let config: TransactionTopicSpec = serde_json::from_str(json).unwrap();
4755 assert!(!config.enabled);
4756 assert!(config.topic_name.is_none());
4757 assert!(config.include_data_collections);
4758 assert_eq!(config.min_events_threshold, 0);
4759 }
4760
4761 #[test]
4762 fn test_schema_change_topic_spec_serde_defaults() {
4763 let json = r#"{}"#;
4764 let config: SchemaChangeTopicSpec = serde_json::from_str(json).unwrap();
4765 assert!(!config.enabled);
4766 assert!(config.topic_name.is_none());
4767 assert!(config.include_columns);
4768 assert!(config.schemas.is_empty());
4769 }
4770
4771 #[test]
4772 fn test_tombstone_cdc_spec_serde_defaults() {
4773 let json = r#"{}"#;
4774 let config: TombstoneCdcSpec = serde_json::from_str(json).unwrap();
4775 assert!(!config.enabled);
4776 assert!(config.after_delete);
4777 assert!(config.behavior.is_empty());
4778 }
4779
4780 #[test]
4781 fn test_validate_tombstone_behavior() {
4782 assert!(validate_tombstone_behavior("emit_null").is_ok());
4783 assert!(validate_tombstone_behavior("emit_with_key").is_ok());
4784 assert!(validate_tombstone_behavior("").is_ok());
4785 assert!(validate_tombstone_behavior("invalid").is_err());
4786 }
4787
4788 #[test]
4789 fn test_field_encryption_spec_serde_defaults() {
4790 let json = r#"{}"#;
4791 let config: FieldEncryptionSpec = serde_json::from_str(json).unwrap();
4792 assert!(!config.enabled);
4793 assert!(config.key_secret_ref.is_none());
4794 assert!(config.fields.is_empty());
4795 assert_eq!(config.algorithm, "aes-256-gcm");
4796 }
4797
4798 #[test]
4799 fn test_read_only_replica_spec_serde_defaults() {
4800 let json = r#"{}"#;
4801 let config: ReadOnlyReplicaSpec = serde_json::from_str(json).unwrap();
4802 assert!(!config.enabled);
4803 assert_eq!(config.lag_threshold_ms, 5000);
4804 assert!(config.deduplicate);
4805 assert!(config.watermark_source.is_empty());
4806 }
4807
4808 #[test]
4809 fn test_validate_watermark_source() {
4810 assert!(validate_watermark_source("primary").is_ok());
4811 assert!(validate_watermark_source("replica").is_ok());
4812 assert!(validate_watermark_source("").is_ok());
4813 assert!(validate_watermark_source("invalid").is_err());
4814 }
4815
4816 #[test]
4817 fn test_event_router_spec_serde_defaults() {
4818 let json = r#"{}"#;
4819 let config: EventRouterSpec = serde_json::from_str(json).unwrap();
4820 assert!(!config.enabled);
4821 assert!(config.default_destination.is_none());
4822 assert!(config.dead_letter_queue.is_none());
4823 assert!(!config.drop_unroutable);
4824 assert!(config.rules.is_empty());
4825 }
4826
4827 #[test]
4828 fn test_validate_route_condition_type() {
4829 assert!(validate_route_condition_type("always").is_ok());
4830 assert!(validate_route_condition_type("table").is_ok());
4831 assert!(validate_route_condition_type("table_pattern").is_ok());
4832 assert!(validate_route_condition_type("schema").is_ok());
4833 assert!(validate_route_condition_type("operation").is_ok());
4834 assert!(validate_route_condition_type("field_equals").is_ok());
4835 assert!(validate_route_condition_type("field_exists").is_ok());
4836 assert!(validate_route_condition_type("invalid").is_err());
4837 }
4838
4839 #[test]
4840 fn test_partitioner_spec_serde_defaults() {
4841 let json = r#"{}"#;
4842 let config: PartitionerSpec = serde_json::from_str(json).unwrap();
4843 assert!(!config.enabled);
4844 assert_eq!(config.num_partitions, 16);
4845 assert_eq!(config.strategy, "key_hash");
4846 assert!(config.key_columns.is_empty());
4847 }
4848
4849 #[test]
4850 fn test_validate_partition_strategy() {
4851 assert!(validate_partition_strategy("round_robin").is_ok());
4852 assert!(validate_partition_strategy("key_hash").is_ok());
4853 assert!(validate_partition_strategy("table_hash").is_ok());
4854 assert!(validate_partition_strategy("full_table_hash").is_ok());
4855 assert!(validate_partition_strategy("sticky").is_ok());
4856 assert!(validate_partition_strategy("invalid").is_err());
4857 }
4858
4859 #[test]
4860 fn test_validate_smt_transform_type() {
4861 assert!(validate_smt_transform_type("extract_new_record_state").is_ok());
4862 assert!(validate_smt_transform_type("mask_field").is_ok());
4863 assert!(validate_smt_transform_type("filter").is_ok());
4864 assert!(validate_smt_transform_type("flatten").is_ok());
4865 assert!(validate_smt_transform_type("cast").is_ok());
4866 assert!(validate_smt_transform_type("regex_router").is_ok());
4867 assert!(validate_smt_transform_type("content_router").is_ok());
4868 assert!(validate_smt_transform_type("invalid").is_err());
4869 }
4870
4871 #[test]
4872 fn test_parallel_cdc_spec_serde_defaults() {
4873 let json = r#"{}"#;
4874 let config: ParallelCdcSpec = serde_json::from_str(json).unwrap();
4875 assert!(!config.enabled);
4876 assert_eq!(config.concurrency, 4);
4877 assert_eq!(config.per_table_buffer, 1000);
4878 assert_eq!(config.output_buffer, 10_000);
4879 assert!(config.work_stealing);
4880 assert!(config.per_table_rate_limit.is_none());
4881 assert_eq!(config.shutdown_timeout_secs, 30);
4882 }
4883
4884 #[test]
4885 fn test_outbox_spec_serde_defaults() {
4886 let json = r#"{}"#;
4887 let config: OutboxSpec = serde_json::from_str(json).unwrap();
4888 assert!(!config.enabled);
4889 assert_eq!(config.table_name, "outbox");
4890 assert_eq!(config.poll_interval_ms, 100);
4891 assert_eq!(config.batch_size, 100);
4892 assert_eq!(config.max_retries, 3);
4893 assert_eq!(config.delivery_timeout_secs, 30);
4894 assert!(config.ordered_delivery);
4895 assert_eq!(config.retention_secs, 86400);
4896 assert_eq!(config.max_concurrency, 10);
4897 }
4898
4899 #[test]
4900 fn test_health_monitor_spec_serde_defaults() {
4901 let json = r#"{}"#;
4902 let config: HealthMonitorSpec = serde_json::from_str(json).unwrap();
4903 assert!(!config.enabled);
4904 assert_eq!(config.check_interval_secs, 10);
4905 assert_eq!(config.max_lag_ms, 30_000);
4906 assert_eq!(config.failure_threshold, 3);
4907 assert_eq!(config.success_threshold, 2);
4908 assert_eq!(config.check_timeout_secs, 5);
4909 assert!(config.auto_recovery);
4910 assert_eq!(config.recovery_delay_secs, 1);
4911 assert_eq!(config.max_recovery_delay_secs, 60);
4912 }
4913
4914 #[test]
4915 fn test_signal_table_spec_serde_defaults() {
4916 let json = r#"{}"#;
4917 let config: SignalTableSpec = serde_json::from_str(json).unwrap();
4918 assert!(!config.enabled);
4919 assert!(config.data_collection.is_none());
4920 assert!(config.topic.is_none());
4921 assert!(config.enabled_channels.is_empty());
4922 assert_eq!(config.poll_interval_ms, 1000);
4923 }
4924}