1use std::path::PathBuf;
4use std::time::Duration;
5
6#[derive(Debug, Clone)]
8pub struct EventBusConfig {
9 pub num_shards: u16,
17
18 pub ring_buffer_capacity: usize,
22
23 pub backpressure_mode: BackpressureMode,
25
26 pub batch: BatchConfig,
28
29 pub adapter: AdapterConfig,
31
32 pub scaling: Option<ScalingPolicy>,
35
36 pub adapter_timeout: Duration,
40
41 pub adapter_batch_retries: u32,
45
46 pub producer_nonce_path: Option<PathBuf>,
61}
62
63impl Default for EventBusConfig {
64 fn default() -> Self {
65 let cpus = num_cpus();
66 Self {
67 num_shards: cpus,
68 ring_buffer_capacity: 1 << 20, backpressure_mode: BackpressureMode::DropOldest,
70 batch: BatchConfig::default(),
71 adapter: AdapterConfig::Noop,
72 scaling: Some(ScalingPolicy::default_for_cpus(cpus)),
73 adapter_timeout: Duration::from_secs(30),
74 adapter_batch_retries: 0,
75 producer_nonce_path: None,
76 }
77 }
78}
79
80impl EventBusConfig {
81 pub fn builder() -> EventBusConfigBuilder {
83 EventBusConfigBuilder::default()
84 }
85
86 pub fn validate(&self) -> Result<(), ConfigError> {
88 if self.num_shards == 0 {
89 return Err(ConfigError::InvalidValue("num_shards must be > 0".into()));
90 }
91 if !self.ring_buffer_capacity.is_power_of_two() {
92 return Err(ConfigError::InvalidValue(
93 "ring_buffer_capacity must be a power of 2".into(),
94 ));
95 }
96 if self.ring_buffer_capacity < 1024 {
97 return Err(ConfigError::InvalidValue(
98 "ring_buffer_capacity must be >= 1024".into(),
99 ));
100 }
101 if self.adapter_timeout.is_zero() {
104 return Err(ConfigError::InvalidValue(
105 "adapter_timeout must be > 0".into(),
106 ));
107 }
108 if let BackpressureMode::Sample { rate: 0 } = self.backpressure_mode {
111 return Err(ConfigError::InvalidValue(
112 "BackpressureMode::Sample.rate must be > 0".into(),
113 ));
114 }
115 self.batch.validate()?;
116 if let Some(ref scaling) = self.scaling {
117 scaling
118 .validate()
119 .map_err(|e| ConfigError::InvalidValue(format!("scaling policy: {}", e)))?;
120 }
121 match &self.adapter {
126 AdapterConfig::Noop => {}
127 #[cfg(feature = "redis")]
128 AdapterConfig::Redis(c) => c
129 .validate()
130 .map_err(|e| ConfigError::InvalidValue(format!("redis adapter: {}", e)))?,
131 #[cfg(feature = "jetstream")]
132 AdapterConfig::JetStream(c) => c
133 .validate()
134 .map_err(|e| ConfigError::InvalidValue(format!("jetstream adapter: {}", e)))?,
135 #[cfg(feature = "net")]
136 AdapterConfig::Net(_) => {} }
139 Ok(())
140 }
141}
142
143#[derive(Debug, Clone)]
145enum ScalingConfig {
146 Default,
148 Disabled,
150 Policy(ScalingPolicy),
152}
153
154#[derive(Debug, Default)]
156pub struct EventBusConfigBuilder {
157 num_shards: Option<u16>,
158 ring_buffer_capacity: Option<usize>,
159 backpressure_mode: Option<BackpressureMode>,
160 batch: Option<BatchConfig>,
161 adapter: Option<AdapterConfig>,
162 scaling: Option<ScalingConfig>,
163 adapter_timeout: Option<Duration>,
164 adapter_batch_retries: Option<u32>,
165 producer_nonce_path: Option<PathBuf>,
166}
167
168impl EventBusConfigBuilder {
169 pub fn num_shards(mut self, n: u16) -> Self {
171 self.num_shards = Some(n);
172 self
173 }
174
175 pub fn ring_buffer_capacity(mut self, cap: usize) -> Self {
177 self.ring_buffer_capacity = Some(cap);
178 self
179 }
180
181 pub fn backpressure_mode(mut self, mode: BackpressureMode) -> Self {
183 self.backpressure_mode = Some(mode);
184 self
185 }
186
187 pub fn batch(mut self, config: BatchConfig) -> Self {
189 self.batch = Some(config);
190 self
191 }
192
193 pub fn adapter(mut self, config: AdapterConfig) -> Self {
195 self.adapter = Some(config);
196 self
197 }
198
199 pub fn scaling(mut self, policy: ScalingPolicy) -> Self {
201 self.scaling = Some(ScalingConfig::Policy(policy));
202 self
203 }
204
205 pub fn with_dynamic_scaling(mut self) -> Self {
208 self.scaling = Some(ScalingConfig::Default);
209 self
210 }
211
212 pub fn without_scaling(mut self) -> Self {
214 self.scaling = Some(ScalingConfig::Disabled);
215 self
216 }
217
218 pub fn adapter_timeout(mut self, timeout: Duration) -> Self {
220 self.adapter_timeout = Some(timeout);
221 self
222 }
223
224 pub fn adapter_batch_retries(mut self, retries: u32) -> Self {
227 self.adapter_batch_retries = Some(retries);
228 self
229 }
230
231 pub fn producer_nonce_path(mut self, path: impl Into<PathBuf>) -> Self {
237 self.producer_nonce_path = Some(path.into());
238 self
239 }
240
241 pub fn build(self) -> Result<EventBusConfig, ConfigError> {
243 let num_shards = self.num_shards.unwrap_or_else(num_cpus);
244 let scaling = match self.scaling {
245 Some(ScalingConfig::Policy(policy)) => Some(policy),
246 Some(ScalingConfig::Default) | None => {
247 Some(ScalingPolicy::default_for_cpus(num_shards))
248 }
249 Some(ScalingConfig::Disabled) => None,
250 };
251 let config = EventBusConfig {
252 num_shards,
253 ring_buffer_capacity: self.ring_buffer_capacity.unwrap_or(1 << 20),
254 backpressure_mode: self
255 .backpressure_mode
256 .unwrap_or(BackpressureMode::DropOldest),
257 batch: self.batch.unwrap_or_default(),
258 adapter: self.adapter.unwrap_or(AdapterConfig::Noop),
259 scaling,
260 adapter_timeout: self.adapter_timeout.unwrap_or(Duration::from_secs(30)),
261 adapter_batch_retries: self.adapter_batch_retries.unwrap_or(0),
262 producer_nonce_path: self.producer_nonce_path,
263 };
264 config.validate()?;
265 Ok(config)
266 }
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub enum BackpressureMode {
272 DropNewest,
274
275 DropOldest,
277
278 FailProducer,
280
281 Sample {
283 rate: u32,
285 },
286}
287
288#[derive(Debug, Clone)]
290pub struct BatchConfig {
291 pub min_size: usize,
294
295 pub max_size: usize,
298
299 pub max_delay: Duration,
302
303 pub adaptive: bool,
306
307 pub velocity_window: Duration,
310}
311
312impl Default for BatchConfig {
313 fn default() -> Self {
314 Self {
315 min_size: 1_000,
316 max_size: 10_000,
317 max_delay: Duration::from_millis(10),
318 adaptive: true,
319 velocity_window: Duration::from_millis(100),
320 }
321 }
322}
323
324impl BatchConfig {
325 pub const MAX_BATCH_SIZE_LIMIT: usize = 1_000_000;
335
336 pub fn validate(&self) -> Result<(), ConfigError> {
338 if self.min_size == 0 {
339 return Err(ConfigError::InvalidValue("min_size must be > 0".into()));
340 }
341 if self.max_size < self.min_size {
342 return Err(ConfigError::InvalidValue(
343 "max_size must be >= min_size".into(),
344 ));
345 }
346 if self.max_size > Self::MAX_BATCH_SIZE_LIMIT {
347 return Err(ConfigError::InvalidValue(format!(
348 "max_size must be <= {} (hostile-config guard against \
349 `current_batch_size * 3 + target` overflow in adaptive batching)",
350 Self::MAX_BATCH_SIZE_LIMIT,
351 )));
352 }
353 if self.max_delay.is_zero() {
354 return Err(ConfigError::InvalidValue("max_delay must be > 0".into()));
355 }
356 if self.adaptive && self.velocity_window.is_zero() {
360 return Err(ConfigError::InvalidValue(
361 "velocity_window must be > 0 when adaptive batching is enabled".into(),
362 ));
363 }
364 Ok(())
365 }
366
367 pub fn high_throughput() -> Self {
369 Self {
370 min_size: 5_000,
371 max_size: 50_000,
372 max_delay: Duration::from_millis(5),
373 adaptive: true,
374 velocity_window: Duration::from_millis(50),
375 }
376 }
377
378 pub fn low_latency() -> Self {
380 Self {
381 min_size: 100,
382 max_size: 1_000,
383 max_delay: Duration::from_millis(1),
384 adaptive: true,
385 velocity_window: Duration::from_millis(20),
386 }
387 }
388}
389
390#[derive(Debug, Clone)]
392pub enum AdapterConfig {
393 Noop,
396
397 #[cfg(feature = "redis")]
399 Redis(RedisAdapterConfig),
400
401 #[cfg(feature = "jetstream")]
403 JetStream(JetStreamAdapterConfig),
404
405 #[cfg(feature = "net")]
408 Net(Box<crate::adapter::net::NetAdapterConfig>),
409}
410
411#[cfg(feature = "redis")]
413#[derive(Debug, Clone)]
414pub struct RedisAdapterConfig {
415 pub url: String,
418
419 pub prefix: String,
423
424 pub pipeline_size: usize,
427
428 pub pool_size: Option<usize>,
431
432 pub connect_timeout: Duration,
435
436 pub command_timeout: Duration,
439
440 pub max_stream_len: Option<usize>,
443}
444
445#[cfg(feature = "redis")]
446impl RedisAdapterConfig {
447 pub fn new(url: impl Into<String>) -> Self {
449 Self {
450 url: url.into(),
451 prefix: "net".into(),
452 pipeline_size: 1000,
453 pool_size: None,
454 connect_timeout: Duration::from_secs(5),
455 command_timeout: Duration::from_secs(1),
456 max_stream_len: None,
457 }
458 }
459
460 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
462 self.prefix = prefix.into();
463 self
464 }
465
466 pub fn with_pipeline_size(mut self, size: usize) -> Self {
468 self.pipeline_size = size;
469 self
470 }
471
472 pub fn with_pool_size(mut self, size: usize) -> Self {
474 self.pool_size = Some(size);
475 self
476 }
477
478 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
480 self.connect_timeout = timeout;
481 self
482 }
483
484 pub fn with_command_timeout(mut self, timeout: Duration) -> Self {
486 self.command_timeout = timeout;
487 self
488 }
489
490 pub fn with_max_stream_len(mut self, len: usize) -> Self {
492 self.max_stream_len = Some(len);
493 self
494 }
495
496 pub fn validate(&self) -> Result<(), ConfigError> {
500 if self.url.is_empty() {
501 return Err(ConfigError::InvalidValue(
502 "redis url must be non-empty".into(),
503 ));
504 }
505 if self.pipeline_size == 0 {
506 return Err(ConfigError::InvalidValue(
507 "redis pipeline_size must be > 0".into(),
508 ));
509 }
510 if self.connect_timeout.is_zero() {
511 return Err(ConfigError::InvalidValue(
512 "redis connect_timeout must be > 0".into(),
513 ));
514 }
515 if self.command_timeout.is_zero() {
516 return Err(ConfigError::InvalidValue(
517 "redis command_timeout must be > 0".into(),
518 ));
519 }
520 Ok(())
521 }
522}
523
524#[cfg(feature = "jetstream")]
526#[derive(Debug, Clone)]
527pub struct JetStreamAdapterConfig {
528 pub url: String,
531
532 pub prefix: String,
536
537 pub connect_timeout: Duration,
540
541 pub request_timeout: Duration,
554
555 pub max_messages: Option<i64>,
558
559 pub max_bytes: Option<i64>,
562
563 pub max_age: Option<Duration>,
566
567 pub replicas: usize,
570
571 pub dedup_window: Duration,
587}
588
589#[cfg(feature = "jetstream")]
590impl JetStreamAdapterConfig {
591 pub fn new(url: impl Into<String>) -> Self {
593 Self {
594 url: url.into(),
595 prefix: "net".into(),
596 connect_timeout: Duration::from_secs(5),
597 request_timeout: Duration::from_secs(5),
598 max_messages: None,
599 max_bytes: None,
600 max_age: None,
601 replicas: 1,
602 dedup_window: Duration::from_secs(3600),
603 }
604 }
605
606 pub fn with_dedup_window(mut self, window: Duration) -> Self {
609 self.dedup_window = window;
610 self
611 }
612
613 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
615 self.prefix = prefix.into();
616 self
617 }
618
619 pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
621 self.connect_timeout = timeout;
622 self
623 }
624
625 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
627 self.request_timeout = timeout;
628 self
629 }
630
631 pub fn with_max_messages(mut self, max: i64) -> Self {
633 self.max_messages = Some(max);
634 self
635 }
636
637 pub fn with_max_bytes(mut self, max: i64) -> Self {
639 self.max_bytes = Some(max);
640 self
641 }
642
643 pub fn with_max_age(mut self, age: Duration) -> Self {
645 self.max_age = Some(age);
646 self
647 }
648
649 pub fn with_replicas(mut self, replicas: usize) -> Self {
651 self.replicas = replicas;
652 self
653 }
654
655 pub fn validate(&self) -> Result<(), ConfigError> {
659 if self.url.is_empty() {
660 return Err(ConfigError::InvalidValue(
661 "jetstream url must be non-empty".into(),
662 ));
663 }
664 if self.connect_timeout.is_zero() {
665 return Err(ConfigError::InvalidValue(
666 "jetstream connect_timeout must be > 0".into(),
667 ));
668 }
669 if self.request_timeout.is_zero() {
670 return Err(ConfigError::InvalidValue(
671 "jetstream request_timeout must be > 0".into(),
672 ));
673 }
674 if self.replicas == 0 {
675 return Err(ConfigError::InvalidValue(
676 "jetstream replicas must be >= 1".into(),
677 ));
678 }
679 if let Some(n) = self.max_messages {
689 if n < 0 {
690 return Err(ConfigError::InvalidValue(format!(
691 "jetstream max_messages must be non-negative (got {n})"
692 )));
693 }
694 }
695 if let Some(n) = self.max_bytes {
696 if n < 0 {
697 return Err(ConfigError::InvalidValue(format!(
698 "jetstream max_bytes must be non-negative (got {n})"
699 )));
700 }
701 }
702 Ok(())
703 }
704}
705
706#[derive(Debug, Clone)]
708pub enum ConfigError {
709 InvalidValue(String),
711}
712
713impl std::fmt::Display for ConfigError {
714 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
715 match self {
716 Self::InvalidValue(msg) => write!(f, "invalid configuration: {}", msg),
717 }
718 }
719}
720
721impl std::error::Error for ConfigError {}
722
723#[derive(Debug, Clone)]
725pub struct ScalingPolicy {
726 pub fill_ratio_threshold: f64,
729
730 pub push_latency_threshold_ns: u64,
733
734 pub flush_latency_threshold_us: u64,
737
738 pub min_shards: u16,
740
741 pub max_shards: u16,
743
744 pub cooldown: Duration,
747
748 pub scale_down_delay: Duration,
751
752 pub underutilized_threshold: f64,
755
756 pub metrics_window: Duration,
759
760 pub auto_scale: bool,
762}
763
764impl Default for ScalingPolicy {
765 fn default() -> Self {
766 Self::default_for_cpus(num_cpus())
767 }
768}
769
770impl ScalingPolicy {
771 pub fn default_for_cpus(cpus: u16) -> Self {
774 Self {
775 fill_ratio_threshold: 0.7,
776 push_latency_threshold_ns: 5,
777 flush_latency_threshold_us: 1000,
778 min_shards: 1,
779 max_shards: cpus,
780 cooldown: Duration::from_secs(1),
781 scale_down_delay: Duration::from_secs(10),
782 underutilized_threshold: 0.1,
783 metrics_window: Duration::from_millis(100),
784 auto_scale: true,
785 }
786 }
787
788 pub fn high_throughput() -> Self {
798 let cpus = num_cpus();
799 Self {
800 fill_ratio_threshold: 0.6,
801 push_latency_threshold_ns: 3,
802 flush_latency_threshold_us: 500,
803 min_shards: 4.min(cpus),
804 max_shards: cpus.saturating_mul(2),
807 cooldown: Duration::from_millis(500),
808 scale_down_delay: Duration::from_secs(30),
809 underutilized_threshold: 0.05,
810 metrics_window: Duration::from_millis(50),
811 auto_scale: true,
812 }
813 }
814
815 pub fn conservative() -> Self {
817 let cpus = num_cpus();
818 Self {
819 fill_ratio_threshold: 0.8,
820 push_latency_threshold_ns: 10,
821 flush_latency_threshold_us: 2000,
822 min_shards: 1,
823 max_shards: cpus,
824 cooldown: Duration::from_secs(5),
825 scale_down_delay: Duration::from_secs(60),
826 underutilized_threshold: 0.05,
827 metrics_window: Duration::from_millis(200),
828 auto_scale: true,
829 }
830 }
831
832 pub fn normalize(mut self) -> Self {
838 if self.max_shards < self.min_shards {
839 self.max_shards = self.min_shards;
840 }
841 self
842 }
843
844 pub fn validate(&self) -> Result<(), ConfigError> {
857 if !self.fill_ratio_threshold.is_finite() {
858 return Err(ConfigError::InvalidValue(
859 "fill_ratio_threshold must be finite (NaN/±inf rejected)".into(),
860 ));
861 }
862 if self.fill_ratio_threshold <= 0.0 || self.fill_ratio_threshold > 1.0 {
863 return Err(ConfigError::InvalidValue(
864 "fill_ratio_threshold must be in (0.0, 1.0]".into(),
865 ));
866 }
867 if !self.underutilized_threshold.is_finite() {
868 return Err(ConfigError::InvalidValue(
869 "underutilized_threshold must be finite (NaN/±inf rejected)".into(),
870 ));
871 }
872 if self.underutilized_threshold < 0.0 || self.underutilized_threshold > 1.0 {
873 return Err(ConfigError::InvalidValue(
874 "underutilized_threshold must be in [0.0, 1.0]".into(),
875 ));
876 }
877 if self.min_shards == 0 {
878 return Err(ConfigError::InvalidValue("min_shards must be > 0".into()));
879 }
880 if self.max_shards < self.min_shards {
881 return Err(ConfigError::InvalidValue(
882 "max_shards must be >= min_shards".into(),
883 ));
884 }
885 if self.cooldown.is_zero() {
890 return Err(ConfigError::InvalidValue("cooldown must be > 0".into()));
891 }
892 if self.metrics_window.is_zero() {
893 return Err(ConfigError::InvalidValue(
894 "metrics_window must be > 0".into(),
895 ));
896 }
897 if self.scale_down_delay.is_zero() {
898 return Err(ConfigError::InvalidValue(
899 "scale_down_delay must be > 0".into(),
900 ));
901 }
902 Ok(())
903 }
904}
905
906fn num_cpus() -> u16 {
908 std::thread::available_parallelism()
909 .map(|n| u16::try_from(n.get()).unwrap_or(u16::MAX))
910 .unwrap_or(1)
911}
912
913#[cfg(test)]
914mod tests {
915 use super::*;
916
917 #[test]
918 fn test_default_config() {
919 let config = EventBusConfig::default();
920 assert!(config.validate().is_ok());
921 assert!(config.num_shards > 0);
922 assert!(config.ring_buffer_capacity.is_power_of_two());
923 }
924
925 #[test]
926 fn test_builder() {
927 let config = EventBusConfig::builder()
928 .num_shards(8)
929 .ring_buffer_capacity(1 << 16)
930 .backpressure_mode(BackpressureMode::FailProducer)
931 .build()
932 .unwrap();
933
934 assert_eq!(config.num_shards, 8);
935 assert_eq!(config.ring_buffer_capacity, 65536);
936 assert_eq!(config.backpressure_mode, BackpressureMode::FailProducer);
937 }
938
939 #[test]
940 fn test_invalid_ring_buffer_capacity() {
941 let result = EventBusConfig::builder()
942 .ring_buffer_capacity(1000) .build();
944
945 assert!(result.is_err());
946 }
947
948 #[test]
949 fn test_batch_config_presets() {
950 let high = BatchConfig::high_throughput();
951 assert!(high.validate().is_ok());
952 assert!(high.max_size > high.min_size);
953
954 let low = BatchConfig::low_latency();
955 assert!(low.validate().is_ok());
956 assert!(low.max_delay < high.max_delay);
957 }
958
959 #[test]
960 fn test_scaling_enabled_by_default() {
961 let config = EventBusConfig::default();
962 assert!(config.scaling.is_some());
963
964 let policy = config.scaling.unwrap();
965 assert_eq!(policy.max_shards, config.num_shards);
966 assert!(policy.auto_scale);
967 }
968
969 #[test]
970 fn test_builder_enables_scaling_by_default() {
971 let config = EventBusConfig::builder().num_shards(8).build().unwrap();
972
973 assert!(config.scaling.is_some());
974 let policy = config.scaling.unwrap();
975 assert_eq!(policy.max_shards, 8);
976 }
977
978 #[test]
979 fn test_builder_without_scaling() {
980 let config = EventBusConfig::builder()
981 .num_shards(4)
982 .without_scaling()
983 .build()
984 .unwrap();
985
986 assert!(config.scaling.is_none());
987 }
988
989 #[test]
990 fn test_with_dynamic_scaling_respects_num_shards() {
991 let config = EventBusConfig::builder()
993 .num_shards(8)
994 .with_dynamic_scaling()
995 .build()
996 .unwrap();
997
998 assert!(config.scaling.is_some());
999 let policy = config.scaling.unwrap();
1000 assert_eq!(policy.max_shards, 8);
1001
1002 let config2 = EventBusConfig::builder()
1004 .with_dynamic_scaling()
1005 .num_shards(16)
1006 .build()
1007 .unwrap();
1008
1009 assert!(config2.scaling.is_some());
1010 let policy2 = config2.scaling.unwrap();
1011 assert_eq!(policy2.max_shards, 16);
1012 }
1013
1014 #[test]
1015 fn test_scaling_policy_presets() {
1016 let high = ScalingPolicy::high_throughput();
1017 assert!(high.validate().is_ok());
1018 assert!(high.max_shards >= high.min_shards);
1019
1020 let conservative = ScalingPolicy::conservative();
1021 assert!(conservative.validate().is_ok());
1022 assert!(conservative.cooldown > high.cooldown);
1023 }
1024
1025 #[test]
1026 fn test_scaling_policy_validation() {
1027 let mut policy = ScalingPolicy {
1028 underutilized_threshold: 0.0,
1029 ..Default::default()
1030 };
1031
1032 assert!(policy.validate().is_ok());
1034 policy.underutilized_threshold = 0.5;
1035 assert!(policy.validate().is_ok());
1036 policy.underutilized_threshold = 1.0;
1037 assert!(policy.validate().is_ok());
1038
1039 policy.underutilized_threshold = -0.1;
1041 assert!(policy.validate().is_err());
1042 policy.underutilized_threshold = 1.1;
1043 assert!(policy.validate().is_err());
1044
1045 policy.underutilized_threshold = 0.1;
1047 policy.fill_ratio_threshold = 0.0;
1048 assert!(policy.validate().is_err());
1049 policy.fill_ratio_threshold = 1.1;
1050 assert!(policy.validate().is_err());
1051 }
1052
1053 #[test]
1064 fn validate_rejects_nan_fill_ratio_threshold() {
1065 let policy = ScalingPolicy {
1066 fill_ratio_threshold: f64::NAN,
1067 ..Default::default()
1068 };
1069 assert!(
1070 policy.validate().is_err(),
1071 "NaN fill_ratio_threshold must be rejected",
1072 );
1073 }
1074
1075 #[test]
1076 fn validate_rejects_nan_underutilized_threshold() {
1077 let policy = ScalingPolicy {
1078 underutilized_threshold: f64::NAN,
1079 ..Default::default()
1080 };
1081 assert!(
1082 policy.validate().is_err(),
1083 "NaN underutilized_threshold must be rejected",
1084 );
1085 }
1086
1087 #[test]
1098 fn validate_rejects_infinity_thresholds() {
1099 let p1 = ScalingPolicy {
1100 fill_ratio_threshold: f64::INFINITY,
1101 ..Default::default()
1102 };
1103 assert!(p1.validate().is_err());
1104
1105 let p2 = ScalingPolicy {
1106 fill_ratio_threshold: f64::NEG_INFINITY,
1107 ..Default::default()
1108 };
1109 assert!(p2.validate().is_err());
1110
1111 let p3 = ScalingPolicy {
1112 underutilized_threshold: f64::INFINITY,
1113 ..Default::default()
1114 };
1115 assert!(p3.validate().is_err());
1116
1117 let p4 = ScalingPolicy {
1118 underutilized_threshold: f64::NEG_INFINITY,
1119 ..Default::default()
1120 };
1121 assert!(p4.validate().is_err());
1122 }
1123
1124 #[test]
1125 fn test_config_validates_scaling_policy() {
1126 let invalid_policy = ScalingPolicy {
1128 min_shards: 10,
1129 max_shards: 5, ..Default::default()
1131 };
1132
1133 let result = EventBusConfig::builder()
1134 .num_shards(4)
1135 .scaling(invalid_policy)
1136 .build();
1137
1138 assert!(result.is_err());
1139
1140 let invalid_policy2 = ScalingPolicy {
1142 fill_ratio_threshold: 1.5, ..Default::default()
1144 };
1145
1146 let result2 = EventBusConfig::builder()
1147 .num_shards(4)
1148 .scaling(invalid_policy2)
1149 .build();
1150
1151 assert!(result2.is_err());
1152 }
1153
1154 #[test]
1157 fn test_high_throughput_max_shards_no_overflow() {
1158 let policy = ScalingPolicy::high_throughput();
1159 assert!(policy.max_shards >= policy.min_shards);
1160 assert!(policy.validate().is_ok());
1161 }
1162
1163 #[test]
1166 fn test_validate_rejects_sample_rate_zero() {
1167 let result = EventBusConfig::builder()
1168 .backpressure_mode(BackpressureMode::Sample { rate: 0 })
1169 .build();
1170 assert!(
1171 result.is_err(),
1172 "BackpressureMode::Sample.rate == 0 must reject"
1173 );
1174 }
1175
1176 #[test]
1186 fn batch_config_rejects_max_size_above_limit() {
1187 let at_limit = BatchConfig {
1189 max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT,
1190 ..Default::default()
1191 };
1192 assert!(
1193 at_limit.validate().is_ok(),
1194 "max_size at MAX_BATCH_SIZE_LIMIT must be valid"
1195 );
1196
1197 let above = BatchConfig {
1199 max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT + 1,
1200 ..Default::default()
1201 };
1202 assert!(
1203 above.validate().is_err(),
1204 "max_size > MAX_BATCH_SIZE_LIMIT must reject — adaptive \
1205 arithmetic overflows past this cap"
1206 );
1207
1208 let hostile = BatchConfig {
1210 max_size: usize::MAX,
1211 ..Default::default()
1212 };
1213 assert!(
1214 hostile.validate().is_err(),
1215 "max_size = usize::MAX must reject (regression: \
1216 current_batch_size * 3 + target overflow)"
1217 );
1218 }
1219
1220 #[test]
1223 fn test_validate_rejects_zero_velocity_window_when_adaptive() {
1224 let bad = BatchConfig {
1225 adaptive: true,
1226 velocity_window: Duration::ZERO,
1227 ..Default::default()
1228 };
1229 assert!(bad.validate().is_err());
1230
1231 let ok = BatchConfig {
1233 adaptive: false,
1234 velocity_window: Duration::ZERO,
1235 ..Default::default()
1236 };
1237 assert!(ok.validate().is_ok());
1238 }
1239
1240 #[test]
1243 fn test_validate_rejects_zero_adapter_timeout() {
1244 let config = EventBusConfig {
1245 adapter_timeout: Duration::ZERO,
1246 ..EventBusConfig::default()
1247 };
1248 assert!(config.validate().is_err());
1249 }
1250
1251 #[test]
1255 fn test_validate_rejects_zero_scaling_durations() {
1256 let base = ScalingPolicy::default();
1257
1258 let mut p = base.clone();
1259 p.cooldown = Duration::ZERO;
1260 assert!(p.validate().is_err());
1261
1262 let mut p = base.clone();
1263 p.metrics_window = Duration::ZERO;
1264 assert!(p.validate().is_err());
1265
1266 let mut p = base;
1267 p.scale_down_delay = Duration::ZERO;
1268 assert!(p.validate().is_err());
1269 }
1270
1271 #[cfg(feature = "redis")]
1275 #[test]
1276 fn test_validate_redis_pipeline_size_zero_rejected() {
1277 let mut redis = RedisAdapterConfig::new("redis://localhost:6379");
1278 redis.pipeline_size = 0;
1279
1280 let result = EventBusConfig::builder()
1281 .adapter(AdapterConfig::Redis(redis))
1282 .build();
1283 assert!(result.is_err(), "redis pipeline_size == 0 must reject");
1284 }
1285
1286 #[cfg(feature = "jetstream")]
1289 #[test]
1290 fn test_validate_jetstream_replicas_zero_rejected() {
1291 let mut js = JetStreamAdapterConfig::new("nats://localhost:4222");
1292 js.replicas = 0;
1293
1294 let result = EventBusConfig::builder()
1295 .adapter(AdapterConfig::JetStream(js))
1296 .build();
1297 assert!(result.is_err(), "jetstream replicas == 0 must reject");
1298 }
1299
1300 #[cfg(feature = "jetstream")]
1305 #[test]
1306 fn validate_rejects_negative_max_messages() {
1307 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(-1);
1308 let err = js
1309 .validate()
1310 .expect_err("negative max_messages must reject");
1311 let msg = format!("{err}");
1312 assert!(
1313 msg.contains("max_messages"),
1314 "error must mention the field, got: {msg}"
1315 );
1316 }
1317
1318 #[cfg(feature = "jetstream")]
1319 #[test]
1320 fn validate_rejects_negative_max_bytes() {
1321 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_bytes(-100);
1322 let err = js.validate().expect_err("negative max_bytes must reject");
1323 let msg = format!("{err}");
1324 assert!(
1325 msg.contains("max_bytes"),
1326 "error must mention the field, got: {msg}"
1327 );
1328 }
1329
1330 #[cfg(feature = "jetstream")]
1331 #[test]
1332 fn validate_accepts_zero_and_positive_max_messages() {
1333 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(0);
1334 assert!(js.validate().is_ok(), "zero must be accepted");
1335 let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(1_000_000);
1336 assert!(js.validate().is_ok(), "positive must be accepted");
1337 }
1338}