1use std::path::PathBuf;
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
8pub struct EventBusConfig {
9 pub num_shards: u16,
17
18 pub ring_buffer_capacity: usize,
22
23 pub backpressure_mode: BackpressureMode,
25
26 pub batch: BatchConfig,
28
29 pub adapter: AdapterConfig,
31
32 pub scaling: Option<ScalingPolicy>,
35
36 pub adapter_timeout: Duration,
40
41 pub adapter_batch_retries: u32,
45
46 pub producer_nonce_path: Option<PathBuf>,
61}
62
63impl Default for EventBusConfig {
64 fn default() -> Self {
65 let cpus = num_cpus();
66 Self {
67 num_shards: cpus,
68 ring_buffer_capacity: 1 << 20, backpressure_mode: BackpressureMode::DropOldest,
70 batch: BatchConfig::default(),
71 adapter: AdapterConfig::Noop,
72 scaling: Some(ScalingPolicy::default_for_cpus(cpus)),
73 adapter_timeout: Duration::from_secs(30),
74 adapter_batch_retries: 0,
75 producer_nonce_path: None,
76 }
77 }
78}
79
80impl EventBusConfig {
81 pub fn builder() -> EventBusConfigBuilder {
83 EventBusConfigBuilder::default()
84 }
85
86 pub fn validate(&self) -> Result<(), ConfigError> {
88 if self.num_shards == 0 {
89 return Err(ConfigError::InvalidValue("num_shards must be > 0".into()));
90 }
91 if !self.ring_buffer_capacity.is_power_of_two() {
92 return Err(ConfigError::InvalidValue(
93 "ring_buffer_capacity must be a power of 2".into(),
94 ));
95 }
96 if self.ring_buffer_capacity < 1024 {
97 return Err(ConfigError::InvalidValue(
98 "ring_buffer_capacity must be >= 1024".into(),
99 ));
100 }
101 if self.adapter_timeout.is_zero() {
104 return Err(ConfigError::InvalidValue(
105 "adapter_timeout must be > 0".into(),
106 ));
107 }
108 if let BackpressureMode::Sample { rate: 0 } = self.backpressure_mode {
111 return Err(ConfigError::InvalidValue(
112 "BackpressureMode::Sample.rate must be > 0".into(),
113 ));
114 }
115 self.batch.validate()?;
116 if let Some(ref scaling) = self.scaling {
117 scaling
118 .validate()
119 .map_err(|e| ConfigError::InvalidValue(format!("scaling policy: {}", e)))?;
120 }
121 match &self.adapter {
126 AdapterConfig::Noop => {}
127 #[cfg(feature = "redis")]
128 AdapterConfig::Redis(c) => c
129 .validate()
130 .map_err(|e| ConfigError::InvalidValue(format!("redis adapter: {}", e)))?,
131 #[cfg(feature = "jetstream")]
132 AdapterConfig::JetStream(c) => c
133 .validate()
134 .map_err(|e| ConfigError::InvalidValue(format!("jetstream adapter: {}", e)))?,
135 #[cfg(feature = "net")]
136 AdapterConfig::Net(_) => {} }
139 Ok(())
140 }
141}
142
143#[derive(Debug, Clone)]
145enum ScalingConfig {
146 Default,
148 Disabled,
150 Policy(ScalingPolicy),
152}
153
154#[derive(Debug, Default)]
156pub struct EventBusConfigBuilder {
157 num_shards: Option<u16>,
158 ring_buffer_capacity: Option<usize>,
159 backpressure_mode: Option<BackpressureMode>,
160 batch: Option<BatchConfig>,
161 adapter: Option<AdapterConfig>,
162 scaling: Option<ScalingConfig>,
163 adapter_timeout: Option<Duration>,
164 adapter_batch_retries: Option<u32>,
165 producer_nonce_path: Option<PathBuf>,
166}
167
168impl EventBusConfigBuilder {
169 pub fn num_shards(mut self, n: u16) -> Self {
171 self.num_shards = Some(n);
172 self
173 }
174
175 pub fn ring_buffer_capacity(mut self, cap: usize) -> Self {
177 self.ring_buffer_capacity = Some(cap);
178 self
179 }
180
181 pub fn backpressure_mode(mut self, mode: BackpressureMode) -> Self {
183 self.backpressure_mode = Some(mode);
184 self
185 }
186
187 pub fn batch(mut self, config: BatchConfig) -> Self {
189 self.batch = Some(config);
190 self
191 }
192
193 pub fn adapter(mut self, config: AdapterConfig) -> Self {
195 self.adapter = Some(config);
196 self
197 }
198
199 pub fn scaling(mut self, policy: ScalingPolicy) -> Self {
201 self.scaling = Some(ScalingConfig::Policy(policy));
202 self
203 }
204
205 pub fn with_dynamic_scaling(mut self) -> Self {
208 self.scaling = Some(ScalingConfig::Default);
209 self
210 }
211
212 pub fn without_scaling(mut self) -> Self {
214 self.scaling = Some(ScalingConfig::Disabled);
215 self
216 }
217
218 pub fn adapter_timeout(mut self, timeout: Duration) -> Self {
220 self.adapter_timeout = Some(timeout);
221 self
222 }
223
224 pub fn adapter_batch_retries(mut self, retries: u32) -> Self {
227 self.adapter_batch_retries = Some(retries);
228 self
229 }
230
231 pub fn producer_nonce_path(mut self, path: impl Into<PathBuf>) -> Self {
237 self.producer_nonce_path = Some(path.into());
238 self
239 }
240
241 pub fn build(self) -> Result<EventBusConfig, ConfigError> {
243 let num_shards = self.num_shards.unwrap_or_else(num_cpus);
244 let scaling = match self.scaling {
245 Some(ScalingConfig::Policy(policy)) => Some(policy),
246 Some(ScalingConfig::Default) | None => {
247 Some(ScalingPolicy::default_for_cpus(num_shards))
248 }
249 Some(ScalingConfig::Disabled) => None,
250 };
251 let config = EventBusConfig {
252 num_shards,
253 ring_buffer_capacity: self.ring_buffer_capacity.unwrap_or(1 << 20),
254 backpressure_mode: self
255 .backpressure_mode
256 .unwrap_or(BackpressureMode::DropOldest),
257 batch: self.batch.unwrap_or_default(),
258 adapter: self.adapter.unwrap_or(AdapterConfig::Noop),
259 scaling,
260 adapter_timeout: self.adapter_timeout.unwrap_or(Duration::from_secs(30)),
261 adapter_batch_retries: self.adapter_batch_retries.unwrap_or(0),
262 producer_nonce_path: self.producer_nonce_path,
263 };
264 config.validate()?;
265 Ok(config)
266 }
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub enum BackpressureMode {
272 DropNewest,
274
275 DropOldest,
277
278 FailProducer,
280
281 Sample {
283 rate: u32,
285 },
286}
287
288#[derive(Debug, Clone)]
290pub struct BatchConfig {
291 pub min_size: usize,
294
295 pub max_size: usize,
298
299 pub max_delay: Duration,
302
303 pub adaptive: bool,
306
307 pub velocity_window: Duration,
310}
311
312impl Default for BatchConfig {
313 fn default() -> Self {
314 Self {
315 min_size: 1_000,
316 max_size: 10_000,
317 max_delay: Duration::from_millis(10),
318 adaptive: true,
319 velocity_window: Duration::from_millis(100),
320 }
321 }
322}
323
324impl BatchConfig {
325 pub const MAX_BATCH_SIZE_LIMIT: usize = 1_000_000;
335
336 pub fn validate(&self) -> Result<(), ConfigError> {
338 if self.min_size == 0 {
339 return Err(ConfigError::InvalidValue("min_size must be > 0".into()));
340 }
341 if self.max_size < self.min_size {
342 return Err(ConfigError::InvalidValue(
343 "max_size must be >= min_size".into(),
344 ));
345 }
346 if self.max_size > Self::MAX_BATCH_SIZE_LIMIT {
347 return Err(ConfigError::InvalidValue(format!(
348 "max_size must be <= {} (hostile-config guard against \
349 `current_batch_size * 3 + target` overflow in adaptive batching)",
350 Self::MAX_BATCH_SIZE_LIMIT,
351 )));
352 }
353 if self.max_delay.is_zero() {
354 return Err(ConfigError::InvalidValue("max_delay must be > 0".into()));
355 }
356 if self.adaptive && self.velocity_window.is_zero() {
360 return Err(ConfigError::InvalidValue(
361 "velocity_window must be > 0 when adaptive batching is enabled".into(),
362 ));
363 }
364 Ok(())
365 }
366
367 pub fn high_throughput() -> Self {
369 Self {
370 min_size: 5_000,
371 max_size: 50_000,
372 max_delay: Duration::from_millis(5),
373 adaptive: true,
374 velocity_window: Duration::from_millis(50),
375 }
376 }
377
378 pub fn low_latency() -> Self {
380 Self {
381 min_size: 100,
382 max_size: 1_000,
383 max_delay: Duration::from_millis(1),
384 adaptive: true,
385 velocity_window: Duration::from_millis(20),
386 }
387 }
388}
389
390#[derive(Debug, Clone)]
392pub enum AdapterConfig {
393 Noop,
396
397 #[cfg(feature = "redis")]
399 Redis(RedisAdapterConfig),
400
401 #[cfg(feature = "jetstream")]
403 JetStream(JetStreamAdapterConfig),
404
405 #[cfg(feature = "net")]
408 Net(Box<crate::adapter::net::NetAdapterConfig>),
409}
410
411#[cfg(feature = "redis")]
413#[derive(Debug, Clone)]
414pub struct RedisAdapterConfig {
415 pub url: String,
418
419 pub prefix: String,
423
424 pub pipeline_size: usize,
427
428 pub pool_size: Option<usize>,
431
432 pub connect_timeout: Duration,
435
436 pub command_timeout: Duration,
439
440 pub max_stream_len: Option<usize>,
443}
444
445#[cfg(feature = "redis")]
446impl RedisAdapterConfig {
447 pub fn new(url: impl Into<String>) -> Self {
449 Self {
450 url: url.into(),
451 prefix: "net".into(),
452 pipeline_size: 1000,
453 pool_size: None,
454 connect_timeout: Duration::from_secs(5),
455 command_timeout: Duration::from_secs(1),
456 max_stream_len: None,
457 }
458 }
459
460 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
462 self.prefix = prefix.into();
463 self
464 }
465
466 pub fn with_pipeline_size(mut self, size: usize) -> Self {
468 self.pipeline_size = size;
469 self
470 }
471
472 pub fn with_pool_size(mut self, size: usize) -> Self {
474 self.pool_size = Some(size);
475 self
476 }
477
478 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
480 self.connect_timeout = timeout;
481 self
482 }
483
484 pub fn with_command_timeout(mut self, timeout: Duration) -> Self {
486 self.command_timeout = timeout;
487 self
488 }
489
490 pub fn with_max_stream_len(mut self, len: usize) -> Self {
492 self.max_stream_len = Some(len);
493 self
494 }
495
496 pub fn validate(&self) -> Result<(), ConfigError> {
500 if self.url.is_empty() {
501 return Err(ConfigError::InvalidValue(
502 "redis url must be non-empty".into(),
503 ));
504 }
505 if self.pipeline_size == 0 {
506 return Err(ConfigError::InvalidValue(
507 "redis pipeline_size must be > 0".into(),
508 ));
509 }
510 if self.connect_timeout.is_zero() {
511 return Err(ConfigError::InvalidValue(
512 "redis connect_timeout must be > 0".into(),
513 ));
514 }
515 if self.command_timeout.is_zero() {
516 return Err(ConfigError::InvalidValue(
517 "redis command_timeout must be > 0".into(),
518 ));
519 }
520 Ok(())
521 }
522}
523
524#[cfg(feature = "jetstream")]
526#[derive(Debug, Clone)]
527pub struct JetStreamAdapterConfig {
528 pub url: String,
531
532 pub prefix: String,
536
537 pub connect_timeout: Duration,
540
541 pub request_timeout: Duration,
544
545 pub max_messages: Option<i64>,
548
549 pub max_bytes: Option<i64>,
552
553 pub max_age: Option<Duration>,
556
557 pub replicas: usize,
560
561 pub dedup_window: Duration,
577}
578
579#[cfg(feature = "jetstream")]
580impl JetStreamAdapterConfig {
581 pub fn new(url: impl Into<String>) -> Self {
583 Self {
584 url: url.into(),
585 prefix: "net".into(),
586 connect_timeout: Duration::from_secs(5),
587 request_timeout: Duration::from_secs(5),
588 max_messages: None,
589 max_bytes: None,
590 max_age: None,
591 replicas: 1,
592 dedup_window: Duration::from_secs(3600),
593 }
594 }
595
596 pub fn with_dedup_window(mut self, window: Duration) -> Self {
599 self.dedup_window = window;
600 self
601 }
602
603 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
605 self.prefix = prefix.into();
606 self
607 }
608
609 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
611 self.connect_timeout = timeout;
612 self
613 }
614
615 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
617 self.request_timeout = timeout;
618 self
619 }
620
621 pub fn with_max_messages(mut self, max: i64) -> Self {
623 self.max_messages = Some(max);
624 self
625 }
626
627 pub fn with_max_bytes(mut self, max: i64) -> Self {
629 self.max_bytes = Some(max);
630 self
631 }
632
633 pub fn with_max_age(mut self, age: Duration) -> Self {
635 self.max_age = Some(age);
636 self
637 }
638
639 pub fn with_replicas(mut self, replicas: usize) -> Self {
641 self.replicas = replicas;
642 self
643 }
644
645 pub fn validate(&self) -> Result<(), ConfigError> {
649 if self.url.is_empty() {
650 return Err(ConfigError::InvalidValue(
651 "jetstream url must be non-empty".into(),
652 ));
653 }
654 if self.connect_timeout.is_zero() {
655 return Err(ConfigError::InvalidValue(
656 "jetstream connect_timeout must be > 0".into(),
657 ));
658 }
659 if self.request_timeout.is_zero() {
660 return Err(ConfigError::InvalidValue(
661 "jetstream request_timeout must be > 0".into(),
662 ));
663 }
664 if self.replicas == 0 {
665 return Err(ConfigError::InvalidValue(
666 "jetstream replicas must be >= 1".into(),
667 ));
668 }
669 if let Some(n) = self.max_messages {
679 if n < 0 {
680 return Err(ConfigError::InvalidValue(format!(
681 "jetstream max_messages must be non-negative (got {n})"
682 )));
683 }
684 }
685 if let Some(n) = self.max_bytes {
686 if n < 0 {
687 return Err(ConfigError::InvalidValue(format!(
688 "jetstream max_bytes must be non-negative (got {n})"
689 )));
690 }
691 }
692 Ok(())
693 }
694}
695
696#[derive(Debug, Clone)]
698pub enum ConfigError {
699 InvalidValue(String),
701}
702
703impl std::fmt::Display for ConfigError {
704 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
705 match self {
706 Self::InvalidValue(msg) => write!(f, "invalid configuration: {}", msg),
707 }
708 }
709}
710
711impl std::error::Error for ConfigError {}
712
713#[derive(Debug, Clone)]
715pub struct ScalingPolicy {
716 pub fill_ratio_threshold: f64,
719
720 pub push_latency_threshold_ns: u64,
723
724 pub flush_latency_threshold_us: u64,
727
728 pub min_shards: u16,
730
731 pub max_shards: u16,
733
734 pub cooldown: Duration,
737
738 pub scale_down_delay: Duration,
741
742 pub underutilized_threshold: f64,
745
746 pub metrics_window: Duration,
749
750 pub auto_scale: bool,
752}
753
754impl Default for ScalingPolicy {
755 fn default() -> Self {
756 Self::default_for_cpus(num_cpus())
757 }
758}
759
760impl ScalingPolicy {
761 pub fn default_for_cpus(cpus: u16) -> Self {
764 Self {
765 fill_ratio_threshold: 0.7,
766 push_latency_threshold_ns: 5,
767 flush_latency_threshold_us: 1000,
768 min_shards: 1,
769 max_shards: cpus,
770 cooldown: Duration::from_secs(1),
771 scale_down_delay: Duration::from_secs(10),
772 underutilized_threshold: 0.1,
773 metrics_window: Duration::from_millis(100),
774 auto_scale: true,
775 }
776 }
777
778 pub fn high_throughput() -> Self {
788 let cpus = num_cpus();
789 Self {
790 fill_ratio_threshold: 0.6,
791 push_latency_threshold_ns: 3,
792 flush_latency_threshold_us: 500,
793 min_shards: 4.min(cpus),
794 max_shards: cpus.saturating_mul(2),
797 cooldown: Duration::from_millis(500),
798 scale_down_delay: Duration::from_secs(30),
799 underutilized_threshold: 0.05,
800 metrics_window: Duration::from_millis(50),
801 auto_scale: true,
802 }
803 }
804
805 pub fn conservative() -> Self {
807 let cpus = num_cpus();
808 Self {
809 fill_ratio_threshold: 0.8,
810 push_latency_threshold_ns: 10,
811 flush_latency_threshold_us: 2000,
812 min_shards: 1,
813 max_shards: cpus,
814 cooldown: Duration::from_secs(5),
815 scale_down_delay: Duration::from_secs(60),
816 underutilized_threshold: 0.05,
817 metrics_window: Duration::from_millis(200),
818 auto_scale: true,
819 }
820 }
821
822 pub fn normalize(mut self) -> Self {
828 if self.max_shards < self.min_shards {
829 self.max_shards = self.min_shards;
830 }
831 self
832 }
833
834 pub fn validate(&self) -> Result<(), ConfigError> {
847 if !self.fill_ratio_threshold.is_finite() {
848 return Err(ConfigError::InvalidValue(
849 "fill_ratio_threshold must be finite (NaN/±inf rejected)".into(),
850 ));
851 }
852 if self.fill_ratio_threshold <= 0.0 || self.fill_ratio_threshold > 1.0 {
853 return Err(ConfigError::InvalidValue(
854 "fill_ratio_threshold must be in (0.0, 1.0]".into(),
855 ));
856 }
857 if !self.underutilized_threshold.is_finite() {
858 return Err(ConfigError::InvalidValue(
859 "underutilized_threshold must be finite (NaN/±inf rejected)".into(),
860 ));
861 }
862 if self.underutilized_threshold < 0.0 || self.underutilized_threshold > 1.0 {
863 return Err(ConfigError::InvalidValue(
864 "underutilized_threshold must be in [0.0, 1.0]".into(),
865 ));
866 }
867 if self.min_shards == 0 {
868 return Err(ConfigError::InvalidValue("min_shards must be > 0".into()));
869 }
870 if self.max_shards < self.min_shards {
871 return Err(ConfigError::InvalidValue(
872 "max_shards must be >= min_shards".into(),
873 ));
874 }
875 if self.cooldown.is_zero() {
880 return Err(ConfigError::InvalidValue("cooldown must be > 0".into()));
881 }
882 if self.metrics_window.is_zero() {
883 return Err(ConfigError::InvalidValue(
884 "metrics_window must be > 0".into(),
885 ));
886 }
887 if self.scale_down_delay.is_zero() {
888 return Err(ConfigError::InvalidValue(
889 "scale_down_delay must be > 0".into(),
890 ));
891 }
892 Ok(())
893 }
894}
895
896fn num_cpus() -> u16 {
898 std::thread::available_parallelism()
899 .map(|n| u16::try_from(n.get()).unwrap_or(u16::MAX))
900 .unwrap_or(1)
901}
902
903#[cfg(test)]
904mod tests {
905 use super::*;
906
907 #[test]
908 fn test_default_config() {
909 let config = EventBusConfig::default();
910 assert!(config.validate().is_ok());
911 assert!(config.num_shards > 0);
912 assert!(config.ring_buffer_capacity.is_power_of_two());
913 }
914
915 #[test]
916 fn test_builder() {
917 let config = EventBusConfig::builder()
918 .num_shards(8)
919 .ring_buffer_capacity(1 << 16)
920 .backpressure_mode(BackpressureMode::FailProducer)
921 .build()
922 .unwrap();
923
924 assert_eq!(config.num_shards, 8);
925 assert_eq!(config.ring_buffer_capacity, 65536);
926 assert_eq!(config.backpressure_mode, BackpressureMode::FailProducer);
927 }
928
929 #[test]
930 fn test_invalid_ring_buffer_capacity() {
931 let result = EventBusConfig::builder()
932 .ring_buffer_capacity(1000) .build();
934
935 assert!(result.is_err());
936 }
937
938 #[test]
939 fn test_batch_config_presets() {
940 let high = BatchConfig::high_throughput();
941 assert!(high.validate().is_ok());
942 assert!(high.max_size > high.min_size);
943
944 let low = BatchConfig::low_latency();
945 assert!(low.validate().is_ok());
946 assert!(low.max_delay < high.max_delay);
947 }
948
949 #[test]
950 fn test_scaling_enabled_by_default() {
951 let config = EventBusConfig::default();
952 assert!(config.scaling.is_some());
953
954 let policy = config.scaling.unwrap();
955 assert_eq!(policy.max_shards, config.num_shards);
956 assert!(policy.auto_scale);
957 }
958
959 #[test]
960 fn test_builder_enables_scaling_by_default() {
961 let config = EventBusConfig::builder().num_shards(8).build().unwrap();
962
963 assert!(config.scaling.is_some());
964 let policy = config.scaling.unwrap();
965 assert_eq!(policy.max_shards, 8);
966 }
967
968 #[test]
969 fn test_builder_without_scaling() {
970 let config = EventBusConfig::builder()
971 .num_shards(4)
972 .without_scaling()
973 .build()
974 .unwrap();
975
976 assert!(config.scaling.is_none());
977 }
978
979 #[test]
980 fn test_with_dynamic_scaling_respects_num_shards() {
981 let config = EventBusConfig::builder()
983 .num_shards(8)
984 .with_dynamic_scaling()
985 .build()
986 .unwrap();
987
988 assert!(config.scaling.is_some());
989 let policy = config.scaling.unwrap();
990 assert_eq!(policy.max_shards, 8);
991
992 let config2 = EventBusConfig::builder()
994 .with_dynamic_scaling()
995 .num_shards(16)
996 .build()
997 .unwrap();
998
999 assert!(config2.scaling.is_some());
1000 let policy2 = config2.scaling.unwrap();
1001 assert_eq!(policy2.max_shards, 16);
1002 }
1003
1004 #[test]
1005 fn test_scaling_policy_presets() {
1006 let high = ScalingPolicy::high_throughput();
1007 assert!(high.validate().is_ok());
1008 assert!(high.max_shards >= high.min_shards);
1009
1010 let conservative = ScalingPolicy::conservative();
1011 assert!(conservative.validate().is_ok());
1012 assert!(conservative.cooldown > high.cooldown);
1013 }
1014
1015 #[test]
1016 fn test_scaling_policy_validation() {
1017 let mut policy = ScalingPolicy {
1018 underutilized_threshold: 0.0,
1019 ..Default::default()
1020 };
1021
1022 assert!(policy.validate().is_ok());
1024 policy.underutilized_threshold = 0.5;
1025 assert!(policy.validate().is_ok());
1026 policy.underutilized_threshold = 1.0;
1027 assert!(policy.validate().is_ok());
1028
1029 policy.underutilized_threshold = -0.1;
1031 assert!(policy.validate().is_err());
1032 policy.underutilized_threshold = 1.1;
1033 assert!(policy.validate().is_err());
1034
1035 policy.underutilized_threshold = 0.1;
1037 policy.fill_ratio_threshold = 0.0;
1038 assert!(policy.validate().is_err());
1039 policy.fill_ratio_threshold = 1.1;
1040 assert!(policy.validate().is_err());
1041 }
1042
1043 #[test]
1054 fn validate_rejects_nan_fill_ratio_threshold() {
1055 let policy = ScalingPolicy {
1056 fill_ratio_threshold: f64::NAN,
1057 ..Default::default()
1058 };
1059 assert!(
1060 policy.validate().is_err(),
1061 "NaN fill_ratio_threshold must be rejected",
1062 );
1063 }
1064
1065 #[test]
1066 fn validate_rejects_nan_underutilized_threshold() {
1067 let policy = ScalingPolicy {
1068 underutilized_threshold: f64::NAN,
1069 ..Default::default()
1070 };
1071 assert!(
1072 policy.validate().is_err(),
1073 "NaN underutilized_threshold must be rejected",
1074 );
1075 }
1076
1077 #[test]
1088 fn validate_rejects_infinity_thresholds() {
1089 let p1 = ScalingPolicy {
1090 fill_ratio_threshold: f64::INFINITY,
1091 ..Default::default()
1092 };
1093 assert!(p1.validate().is_err());
1094
1095 let p2 = ScalingPolicy {
1096 fill_ratio_threshold: f64::NEG_INFINITY,
1097 ..Default::default()
1098 };
1099 assert!(p2.validate().is_err());
1100
1101 let p3 = ScalingPolicy {
1102 underutilized_threshold: f64::INFINITY,
1103 ..Default::default()
1104 };
1105 assert!(p3.validate().is_err());
1106
1107 let p4 = ScalingPolicy {
1108 underutilized_threshold: f64::NEG_INFINITY,
1109 ..Default::default()
1110 };
1111 assert!(p4.validate().is_err());
1112 }
1113
1114 #[test]
1115 fn test_config_validates_scaling_policy() {
1116 let invalid_policy = ScalingPolicy {
1118 min_shards: 10,
1119 max_shards: 5, ..Default::default()
1121 };
1122
1123 let result = EventBusConfig::builder()
1124 .num_shards(4)
1125 .scaling(invalid_policy)
1126 .build();
1127
1128 assert!(result.is_err());
1129
1130 let invalid_policy2 = ScalingPolicy {
1132 fill_ratio_threshold: 1.5, ..Default::default()
1134 };
1135
1136 let result2 = EventBusConfig::builder()
1137 .num_shards(4)
1138 .scaling(invalid_policy2)
1139 .build();
1140
1141 assert!(result2.is_err());
1142 }
1143
1144 #[test]
1147 fn test_high_throughput_max_shards_no_overflow() {
1148 let policy = ScalingPolicy::high_throughput();
1149 assert!(policy.max_shards >= policy.min_shards);
1150 assert!(policy.validate().is_ok());
1151 }
1152
1153 #[test]
1156 fn test_validate_rejects_sample_rate_zero() {
1157 let result = EventBusConfig::builder()
1158 .backpressure_mode(BackpressureMode::Sample { rate: 0 })
1159 .build();
1160 assert!(
1161 result.is_err(),
1162 "BackpressureMode::Sample.rate == 0 must reject"
1163 );
1164 }
1165
1166 #[test]
1176 fn batch_config_rejects_max_size_above_limit() {
1177 let at_limit = BatchConfig {
1179 max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT,
1180 ..Default::default()
1181 };
1182 assert!(
1183 at_limit.validate().is_ok(),
1184 "max_size at MAX_BATCH_SIZE_LIMIT must be valid"
1185 );
1186
1187 let above = BatchConfig {
1189 max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT + 1,
1190 ..Default::default()
1191 };
1192 assert!(
1193 above.validate().is_err(),
1194 "max_size > MAX_BATCH_SIZE_LIMIT must reject — adaptive \
1195 arithmetic overflows past this cap"
1196 );
1197
1198 let hostile = BatchConfig {
1200 max_size: usize::MAX,
1201 ..Default::default()
1202 };
1203 assert!(
1204 hostile.validate().is_err(),
1205 "max_size = usize::MAX must reject (regression: \
1206 current_batch_size * 3 + target overflow)"
1207 );
1208 }
1209
1210 #[test]
1213 fn test_validate_rejects_zero_velocity_window_when_adaptive() {
1214 let bad = BatchConfig {
1215 adaptive: true,
1216 velocity_window: Duration::ZERO,
1217 ..Default::default()
1218 };
1219 assert!(bad.validate().is_err());
1220
1221 let ok = BatchConfig {
1223 adaptive: false,
1224 velocity_window: Duration::ZERO,
1225 ..Default::default()
1226 };
1227 assert!(ok.validate().is_ok());
1228 }
1229
1230 #[test]
1233 fn test_validate_rejects_zero_adapter_timeout() {
1234 let config = EventBusConfig {
1235 adapter_timeout: Duration::ZERO,
1236 ..EventBusConfig::default()
1237 };
1238 assert!(config.validate().is_err());
1239 }
1240
1241 #[test]
1245 fn test_validate_rejects_zero_scaling_durations() {
1246 let base = ScalingPolicy::default();
1247
1248 let mut p = base.clone();
1249 p.cooldown = Duration::ZERO;
1250 assert!(p.validate().is_err());
1251
1252 let mut p = base.clone();
1253 p.metrics_window = Duration::ZERO;
1254 assert!(p.validate().is_err());
1255
1256 let mut p = base;
1257 p.scale_down_delay = Duration::ZERO;
1258 assert!(p.validate().is_err());
1259 }
1260
1261 #[cfg(feature = "redis")]
1265 #[test]
1266 fn test_validate_redis_pipeline_size_zero_rejected() {
1267 let mut redis = RedisAdapterConfig::new("redis://localhost:6379");
1268 redis.pipeline_size = 0;
1269
1270 let result = EventBusConfig::builder()
1271 .adapter(AdapterConfig::Redis(redis))
1272 .build();
1273 assert!(result.is_err(), "redis pipeline_size == 0 must reject");
1274 }
1275
1276 #[cfg(feature = "jetstream")]
1279 #[test]
1280 fn test_validate_jetstream_replicas_zero_rejected() {
1281 let mut js = JetStreamAdapterConfig::new("nats://localhost:4222");
1282 js.replicas = 0;
1283
1284 let result = EventBusConfig::builder()
1285 .adapter(AdapterConfig::JetStream(js))
1286 .build();
1287 assert!(result.is_err(), "jetstream replicas == 0 must reject");
1288 }
1289
1290 #[cfg(feature = "jetstream")]
1295 #[test]
1296 fn validate_rejects_negative_max_messages() {
1297 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(-1);
1298 let err = js
1299 .validate()
1300 .expect_err("negative max_messages must reject");
1301 let msg = format!("{err}");
1302 assert!(
1303 msg.contains("max_messages"),
1304 "error must mention the field, got: {msg}"
1305 );
1306 }
1307
1308 #[cfg(feature = "jetstream")]
1309 #[test]
1310 fn validate_rejects_negative_max_bytes() {
1311 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_bytes(-100);
1312 let err = js.validate().expect_err("negative max_bytes must reject");
1313 let msg = format!("{err}");
1314 assert!(
1315 msg.contains("max_bytes"),
1316 "error must mention the field, got: {msg}"
1317 );
1318 }
1319
1320 #[cfg(feature = "jetstream")]
1321 #[test]
1322 fn validate_accepts_zero_and_positive_max_messages() {
1323 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(0);
1324 assert!(js.validate().is_ok(), "zero must be accepted");
1325 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(1_000_000);
1326 assert!(js.validate().is_ok(), "positive must be accepted");
1327 }
1328}