Skip to main content

net/
config.rs

1//! Configuration types for the Net event bus.
2
3use std::path::PathBuf;
4use std::time::Duration;
5
6/// Top-level configuration for the event bus.
7#[derive(Debug, Clone)]
8pub struct EventBusConfig {
9    /// Number of shards for parallel ingestion.
10    /// Each shard has its own ring buffer and batch worker.
11    /// Default: number of CPU cores.
12    ///
13    /// Unless you're connected to an advanced AI orchestrator, swarm controller,
14    /// or a local Nvidia Blackwell GPU cluster, any number is fine - it will
15    /// revert to your physical core count by default.
16    pub num_shards: u16,
17
18    /// Capacity of each shard's ring buffer (number of events).
19    /// Must be a power of 2 for efficient modulo operations.
20    /// Default: 1,048,576 (1M events per shard).
21    pub ring_buffer_capacity: usize,
22
23    /// Backpressure policy when ring buffers are full.
24    pub backpressure_mode: BackpressureMode,
25
26    /// Batch aggregation configuration.
27    pub batch: BatchConfig,
28
29    /// Adapter configuration.
30    pub adapter: AdapterConfig,
31
32    /// Dynamic scaling configuration.
33    /// If None, dynamic scaling is disabled.
34    pub scaling: Option<ScalingPolicy>,
35
36    /// Timeout for adapter operations (init, on_batch, flush, shutdown).
37    /// Prevents a hanging adapter from blocking the event bus.
38    /// Default: 30 seconds.
39    pub adapter_timeout: Duration,
40
41    /// Number of times to retry a failed on_batch before dropping the batch.
42    /// 0 = no retries (drop immediately on failure, default).
43    /// Retries use a fixed 100ms delay between attempts.
44    pub adapter_batch_retries: u32,
45
46    /// File path for the persistent producer nonce. When
47    /// `Some`, the bus loads (or creates on first run) the u64
48    /// nonce at this path on startup and stamps it on every
49    /// outgoing batch. Adapters that key dedup on
50    /// `(producer_nonce, shard, sequence_start, i)` then dedup
51    /// retransmits across process restart — JetStream's
52    /// `Nats-Msg-Id` is the canonical example.
53    ///
54    /// When `None` (default), the bus uses a per-process nonce
55    /// sampled fresh at every startup (today's behavior). That's
56    /// fine for in-memory adapters and for any deployment where
57    /// "at-most-once across process restart" is acceptable;
58    /// production JetStream / Redis deployments should set this
59    /// to a stable path on local persistent storage.
60    pub producer_nonce_path: Option<PathBuf>,
61}
62
63impl Default for EventBusConfig {
64    fn default() -> Self {
65        let cpus = num_cpus();
66        Self {
67            num_shards: cpus,
68            ring_buffer_capacity: 1 << 20, // 1M events
69            backpressure_mode: BackpressureMode::DropOldest,
70            batch: BatchConfig::default(),
71            adapter: AdapterConfig::Noop,
72            scaling: Some(ScalingPolicy::default_for_cpus(cpus)),
73            adapter_timeout: Duration::from_secs(30),
74            adapter_batch_retries: 0,
75            producer_nonce_path: None,
76        }
77    }
78}
79
80impl EventBusConfig {
81    /// Create a new configuration builder.
82    pub fn builder() -> EventBusConfigBuilder {
83        EventBusConfigBuilder::default()
84    }
85
86    /// Validate the configuration.
87    pub fn validate(&self) -> Result<(), ConfigError> {
88        if self.num_shards == 0 {
89            return Err(ConfigError::InvalidValue("num_shards must be > 0".into()));
90        }
91        if !self.ring_buffer_capacity.is_power_of_two() {
92            return Err(ConfigError::InvalidValue(
93                "ring_buffer_capacity must be a power of 2".into(),
94            ));
95        }
96        if self.ring_buffer_capacity < 1024 {
97            return Err(ConfigError::InvalidValue(
98                "ring_buffer_capacity must be >= 1024".into(),
99            ));
100        }
101        // Adapter timeout of zero would make every adapter call time
102        // out instantly. Reject at config time.
103        if self.adapter_timeout.is_zero() {
104            return Err(ConfigError::InvalidValue(
105                "adapter_timeout must be > 0".into(),
106            ));
107        }
108        // `Sample { rate: 0 }` was accepted but crashed downstream
109        // sampling (counter % 0).
110        if let BackpressureMode::Sample { rate: 0 } = self.backpressure_mode {
111            return Err(ConfigError::InvalidValue(
112                "BackpressureMode::Sample.rate must be > 0".into(),
113            ));
114        }
115        self.batch.validate()?;
116        if let Some(ref scaling) = self.scaling {
117            scaling
118                .validate()
119                .map_err(|e| ConfigError::InvalidValue(format!("scaling policy: {}", e)))?;
120        }
121        // Recurse into adapter configs. Previously these were
122        // accepted blindly and zero-divisor fields like
123        // `RedisAdapterConfig::pipeline_size: 0` shipped through to
124        // runtime panics.
125        match &self.adapter {
126            AdapterConfig::Noop => {}
127            #[cfg(feature = "redis")]
128            AdapterConfig::Redis(c) => c
129                .validate()
130                .map_err(|e| ConfigError::InvalidValue(format!("redis adapter: {}", e)))?,
131            #[cfg(feature = "jetstream")]
132            AdapterConfig::JetStream(c) => c
133                .validate()
134                .map_err(|e| ConfigError::InvalidValue(format!("jetstream adapter: {}", e)))?,
135            #[cfg(feature = "net")]
136            AdapterConfig::Net(_) => {} // Net adapter has its own
137                                        // validation pipeline, not in scope here.
138        }
139        Ok(())
140    }
141}
142
143/// Scaling configuration for the builder.
144#[derive(Debug, Clone)]
145enum ScalingConfig {
146    /// Use default policy based on num_shards (resolved at build time).
147    Default,
148    /// Explicitly disabled.
149    Disabled,
150    /// Explicit policy.
151    Policy(ScalingPolicy),
152}
153
154/// Builder for EventBusConfig.
155#[derive(Debug, Default)]
156pub struct EventBusConfigBuilder {
157    num_shards: Option<u16>,
158    ring_buffer_capacity: Option<usize>,
159    backpressure_mode: Option<BackpressureMode>,
160    batch: Option<BatchConfig>,
161    adapter: Option<AdapterConfig>,
162    scaling: Option<ScalingConfig>,
163    adapter_timeout: Option<Duration>,
164    adapter_batch_retries: Option<u32>,
165    producer_nonce_path: Option<PathBuf>,
166}
167
168impl EventBusConfigBuilder {
169    /// Set the number of shards.
170    pub fn num_shards(mut self, n: u16) -> Self {
171        self.num_shards = Some(n);
172        self
173    }
174
175    /// Set the ring buffer capacity per shard.
176    pub fn ring_buffer_capacity(mut self, cap: usize) -> Self {
177        self.ring_buffer_capacity = Some(cap);
178        self
179    }
180
181    /// Set the backpressure mode.
182    pub fn backpressure_mode(mut self, mode: BackpressureMode) -> Self {
183        self.backpressure_mode = Some(mode);
184        self
185    }
186
187    /// Set the batch configuration.
188    pub fn batch(mut self, config: BatchConfig) -> Self {
189        self.batch = Some(config);
190        self
191    }
192
193    /// Set the adapter configuration.
194    pub fn adapter(mut self, config: AdapterConfig) -> Self {
195        self.adapter = Some(config);
196        self
197    }
198
199    /// Enable dynamic scaling with the given policy.
200    pub fn scaling(mut self, policy: ScalingPolicy) -> Self {
201        self.scaling = Some(ScalingConfig::Policy(policy));
202        self
203    }
204
205    /// Enable dynamic scaling with default policy.
206    /// The policy's max_shards will be based on num_shards (resolved at build time).
207    pub fn with_dynamic_scaling(mut self) -> Self {
208        self.scaling = Some(ScalingConfig::Default);
209        self
210    }
211
212    /// Disable dynamic scaling (use fixed shard count).
213    pub fn without_scaling(mut self) -> Self {
214        self.scaling = Some(ScalingConfig::Disabled);
215        self
216    }
217
218    /// Set the adapter operation timeout.
219    pub fn adapter_timeout(mut self, timeout: Duration) -> Self {
220        self.adapter_timeout = Some(timeout);
221        self
222    }
223
224    /// Set the number of retries for failed on_batch calls.
225    /// 0 = no retries (default). Useful for Redis/JetStream under intermittent failures.
226    pub fn adapter_batch_retries(mut self, retries: u32) -> Self {
227        self.adapter_batch_retries = Some(retries);
228        self
229    }
230
231    /// Persist the producer nonce at `path` so it survives process
232    /// restart. Recommended for any deployment using
233    /// JetStream / Redis adapters where retries-after-crash should
234    /// dedup against the prior incarnation. See
235    /// `EventBusConfig::producer_nonce_path` for the full doc.
236    pub fn producer_nonce_path(mut self, path: impl Into<PathBuf>) -> Self {
237        self.producer_nonce_path = Some(path.into());
238        self
239    }
240
241    /// Build the configuration, validating all settings.
242    pub fn build(self) -> Result<EventBusConfig, ConfigError> {
243        let num_shards = self.num_shards.unwrap_or_else(num_cpus);
244        let scaling = match self.scaling {
245            Some(ScalingConfig::Policy(policy)) => Some(policy),
246            Some(ScalingConfig::Default) | None => {
247                Some(ScalingPolicy::default_for_cpus(num_shards))
248            }
249            Some(ScalingConfig::Disabled) => None,
250        };
251        let config = EventBusConfig {
252            num_shards,
253            ring_buffer_capacity: self.ring_buffer_capacity.unwrap_or(1 << 20),
254            backpressure_mode: self
255                .backpressure_mode
256                .unwrap_or(BackpressureMode::DropOldest),
257            batch: self.batch.unwrap_or_default(),
258            adapter: self.adapter.unwrap_or(AdapterConfig::Noop),
259            scaling,
260            adapter_timeout: self.adapter_timeout.unwrap_or(Duration::from_secs(30)),
261            adapter_batch_retries: self.adapter_batch_retries.unwrap_or(0),
262            producer_nonce_path: self.producer_nonce_path,
263        };
264        config.validate()?;
265        Ok(config)
266    }
267}
268
269/// Backpressure policy when ring buffers are full.
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub enum BackpressureMode {
272    /// Drop the newest event (the one being inserted).
273    DropNewest,
274
275    /// Drop the oldest event in the buffer to make room.
276    DropOldest,
277
278    /// Return an error to the producer.
279    FailProducer,
280
281    /// Sample events: keep 1 in N events.
282    Sample {
283        /// Keep 1 event for every `rate` events.
284        rate: u32,
285    },
286}
287
288/// Batch aggregation configuration.
289#[derive(Debug, Clone)]
290pub struct BatchConfig {
291    /// Minimum batch size (floor for adaptive sizing).
292    /// Default: 1,000 events.
293    pub min_size: usize,
294
295    /// Maximum batch size (ceiling for adaptive sizing).
296    /// Default: 10,000 events.
297    pub max_size: usize,
298
299    /// Maximum time to wait before flushing a partial batch.
300    /// Default: 10ms.
301    pub max_delay: Duration,
302
303    /// Enable adaptive batch sizing based on ingestion velocity.
304    /// Default: true.
305    pub adaptive: bool,
306
307    /// Window size for velocity calculation (adaptive mode).
308    /// Default: 100ms.
309    pub velocity_window: Duration,
310}
311
312impl Default for BatchConfig {
313    fn default() -> Self {
314        Self {
315            min_size: 1_000,
316            max_size: 10_000,
317            max_delay: Duration::from_millis(10),
318            adaptive: true,
319            velocity_window: Duration::from_millis(100),
320        }
321    }
322}
323
324impl BatchConfig {
325    /// Upper bound on `max_size`. The adaptive-batching code in
326    /// `shard/batch.rs` uses arithmetic like
327    /// `current_batch_size * 3 + target` against `max_size`-clamped
328    /// values; with `max_size = usize::MAX` the multiplication
329    /// panics in debug builds and wraps in release. The default
330    /// production `max_size = 10_000` is far below this cap, so
331    /// this is purely a hostile-config guard. Set well above any
332    /// plausible workload (`high_throughput` ships `50_000`) but
333    /// well below the arithmetic-blast radius.
334    pub const MAX_BATCH_SIZE_LIMIT: usize = 1_000_000;
335
336    /// Validate the batch configuration.
337    pub fn validate(&self) -> Result<(), ConfigError> {
338        if self.min_size == 0 {
339            return Err(ConfigError::InvalidValue("min_size must be > 0".into()));
340        }
341        if self.max_size < self.min_size {
342            return Err(ConfigError::InvalidValue(
343                "max_size must be >= min_size".into(),
344            ));
345        }
346        if self.max_size > Self::MAX_BATCH_SIZE_LIMIT {
347            return Err(ConfigError::InvalidValue(format!(
348                "max_size must be <= {} (hostile-config guard against \
349                 `current_batch_size * 3 + target` overflow in adaptive batching)",
350                Self::MAX_BATCH_SIZE_LIMIT,
351            )));
352        }
353        if self.max_delay.is_zero() {
354            return Err(ConfigError::InvalidValue("max_delay must be > 0".into()));
355        }
356        // Zero `velocity_window` div-by-zeros the throughput
357        // calculator when adaptive batching is enabled. Validate
358        // only when the field is actually consulted.
359        if self.adaptive && self.velocity_window.is_zero() {
360            return Err(ConfigError::InvalidValue(
361                "velocity_window must be > 0 when adaptive batching is enabled".into(),
362            ));
363        }
364        Ok(())
365    }
366
367    /// Create a high-throughput configuration optimized for Blackwell workloads.
368    pub fn high_throughput() -> Self {
369        Self {
370            min_size: 5_000,
371            max_size: 50_000,
372            max_delay: Duration::from_millis(5),
373            adaptive: true,
374            velocity_window: Duration::from_millis(50),
375        }
376    }
377
378    /// Create a low-latency configuration for interactive workloads.
379    pub fn low_latency() -> Self {
380        Self {
381            min_size: 100,
382            max_size: 1_000,
383            max_delay: Duration::from_millis(1),
384            adaptive: true,
385            velocity_window: Duration::from_millis(20),
386        }
387    }
388}
389
390/// Adapter configuration.
391#[derive(Debug, Clone)]
392pub enum AdapterConfig {
393    /// No-op adapter (events are discarded after batching).
394    /// Useful for testing and benchmarking.
395    Noop,
396
397    /// Redis Streams adapter.
398    #[cfg(feature = "redis")]
399    Redis(RedisAdapterConfig),
400
401    /// NATS JetStream adapter.
402    #[cfg(feature = "jetstream")]
403    JetStream(JetStreamAdapterConfig),
404
405    /// Net (Net L0 Transport Protocol) adapter.
406    /// High-performance UDP transport for GPU-to-GPU communication.
407    #[cfg(feature = "net")]
408    Net(Box<crate::adapter::net::NetAdapterConfig>),
409}
410
411/// Redis adapter configuration.
412#[cfg(feature = "redis")]
413#[derive(Debug, Clone)]
414pub struct RedisAdapterConfig {
415    /// Redis connection URL.
416    /// Example: "redis://localhost:6379"
417    pub url: String,
418
419    /// Stream key prefix.
420    /// Streams are named: "{prefix}:shard:{shard_id}"
421    /// Default: "net"
422    pub prefix: String,
423
424    /// Maximum commands per pipeline.
425    /// Default: 1000.
426    pub pipeline_size: usize,
427
428    /// Connection pool size.
429    /// Default: number of shards.
430    pub pool_size: Option<usize>,
431
432    /// Connection timeout.
433    /// Default: 5 seconds.
434    pub connect_timeout: Duration,
435
436    /// Command timeout.
437    /// Default: 1 second.
438    pub command_timeout: Duration,
439
440    /// Maximum stream length (MAXLEN for XADD).
441    /// None = unlimited.
442    pub max_stream_len: Option<usize>,
443}
444
445#[cfg(feature = "redis")]
446impl RedisAdapterConfig {
447    /// Create a new Redis adapter configuration.
448    pub fn new(url: impl Into<String>) -> Self {
449        Self {
450            url: url.into(),
451            prefix: "net".into(),
452            pipeline_size: 1000,
453            pool_size: None,
454            connect_timeout: Duration::from_secs(5),
455            command_timeout: Duration::from_secs(1),
456            max_stream_len: None,
457        }
458    }
459
460    /// Set the stream key prefix.
461    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
462        self.prefix = prefix.into();
463        self
464    }
465
466    /// Set the pipeline size.
467    pub fn with_pipeline_size(mut self, size: usize) -> Self {
468        self.pipeline_size = size;
469        self
470    }
471
472    /// Set the connection pool size.
473    pub fn with_pool_size(mut self, size: usize) -> Self {
474        self.pool_size = Some(size);
475        self
476    }
477
478    /// Set the connection timeout.
479    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
480        self.connect_timeout = timeout;
481        self
482    }
483
484    /// Set the command timeout.
485    pub fn with_command_timeout(mut self, timeout: Duration) -> Self {
486        self.command_timeout = timeout;
487        self
488    }
489
490    /// Set the maximum stream length.
491    pub fn with_max_stream_len(mut self, len: usize) -> Self {
492        self.max_stream_len = Some(len);
493        self
494    }
495
496    /// Validate the configuration. Called from
497    /// `EventBusConfig::validate` so adapter misconfiguration is
498    /// caught at startup rather than at the first batch dispatch.
499    pub fn validate(&self) -> Result<(), ConfigError> {
500        if self.url.is_empty() {
501            return Err(ConfigError::InvalidValue(
502                "redis url must be non-empty".into(),
503            ));
504        }
505        if self.pipeline_size == 0 {
506            return Err(ConfigError::InvalidValue(
507                "redis pipeline_size must be > 0".into(),
508            ));
509        }
510        if self.connect_timeout.is_zero() {
511            return Err(ConfigError::InvalidValue(
512                "redis connect_timeout must be > 0".into(),
513            ));
514        }
515        if self.command_timeout.is_zero() {
516            return Err(ConfigError::InvalidValue(
517                "redis command_timeout must be > 0".into(),
518            ));
519        }
520        Ok(())
521    }
522}
523
524/// NATS JetStream adapter configuration.
525#[cfg(feature = "jetstream")]
526#[derive(Debug, Clone)]
527pub struct JetStreamAdapterConfig {
528    /// NATS server URL.
529    /// Example: "nats://localhost:4222"
530    pub url: String,
531
532    /// Stream name prefix.
533    /// Streams are named: "{prefix}_shard_{shard_id}"
534    /// Default: "net"
535    pub prefix: String,
536
537    /// Connection timeout.
538    /// Default: 5 seconds.
539    pub connect_timeout: Duration,
540
541    /// Request timeout for publish/fetch operations.
542    /// Default: 5 seconds.
543    pub request_timeout: Duration,
544
545    /// Maximum messages per stream (oldest are discarded when exceeded).
546    /// None = unlimited.
547    pub max_messages: Option<i64>,
548
549    /// Maximum bytes per stream.
550    /// None = unlimited.
551    pub max_bytes: Option<i64>,
552
553    /// Maximum age for messages in the stream.
554    /// None = unlimited.
555    pub max_age: Option<Duration>,
556
557    /// Number of stream replicas for fault tolerance.
558    /// Default: 1 (no replication).
559    pub replicas: usize,
560
561    /// Server-side dedup window for `Nats-Msg-Id` header matching.
562    /// JetStream discards a publish whose msg-id matches one
563    /// observed within this window — the bus's `on_batch` retry
564    /// path relies on this to make mid-batch failures idempotent.
565    /// Default: 1 hour.
566    ///
567    /// The NATS / async-nats default is 2 minutes. Under the bus's
568    /// retry policy a slow caller (network flap, long backoff,
569    /// queued-up backpressure) can land the same `(nonce, shard,
570    /// seq)` msg-id past the 2 min mark, where dedup no longer
571    /// fires and the same event publishes at two distinct
572    /// JetStream sequences. 1 hour is wider than any realistic
573    /// retry envelope while bounding server-side dedup-table
574    /// memory growth (one entry per unique msg-id observed within
575    /// the window).
576    pub dedup_window: Duration,
577}
578
579#[cfg(feature = "jetstream")]
580impl JetStreamAdapterConfig {
581    /// Create a new JetStream adapter configuration.
582    pub fn new(url: impl Into<String>) -> Self {
583        Self {
584            url: url.into(),
585            prefix: "net".into(),
586            connect_timeout: Duration::from_secs(5),
587            request_timeout: Duration::from_secs(5),
588            max_messages: None,
589            max_bytes: None,
590            max_age: None,
591            replicas: 1,
592            dedup_window: Duration::from_secs(3600),
593        }
594    }
595
596    /// Set the JetStream dedup window. See the field doc on
597    /// [`Self::dedup_window`] for the trade-off vs the NATS default.
598    pub fn with_dedup_window(mut self, window: Duration) -> Self {
599        self.dedup_window = window;
600        self
601    }
602
603    /// Set the stream name prefix.
604    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
605        self.prefix = prefix.into();
606        self
607    }
608
609    /// Set the connection timeout.
610    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
611        self.connect_timeout = timeout;
612        self
613    }
614
615    /// Set the request timeout.
616    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
617        self.request_timeout = timeout;
618        self
619    }
620
621    /// Set the maximum messages per stream.
622    pub fn with_max_messages(mut self, max: i64) -> Self {
623        self.max_messages = Some(max);
624        self
625    }
626
627    /// Set the maximum bytes per stream.
628    pub fn with_max_bytes(mut self, max: i64) -> Self {
629        self.max_bytes = Some(max);
630        self
631    }
632
633    /// Set the maximum age for messages.
634    pub fn with_max_age(mut self, age: Duration) -> Self {
635        self.max_age = Some(age);
636        self
637    }
638
639    /// Set the number of replicas.
640    pub fn with_replicas(mut self, replicas: usize) -> Self {
641        self.replicas = replicas;
642        self
643    }
644
645    /// Validate the configuration. Called from
646    /// `EventBusConfig::validate` so adapter misconfiguration is
647    /// caught at startup rather than at the first batch dispatch.
648    pub fn validate(&self) -> Result<(), ConfigError> {
649        if self.url.is_empty() {
650            return Err(ConfigError::InvalidValue(
651                "jetstream url must be non-empty".into(),
652            ));
653        }
654        if self.connect_timeout.is_zero() {
655            return Err(ConfigError::InvalidValue(
656                "jetstream connect_timeout must be > 0".into(),
657            ));
658        }
659        if self.request_timeout.is_zero() {
660            return Err(ConfigError::InvalidValue(
661                "jetstream request_timeout must be > 0".into(),
662            ));
663        }
664        if self.replicas == 0 {
665            return Err(ConfigError::InvalidValue(
666                "jetstream replicas must be >= 1".into(),
667            ));
668        }
669        // NATS rejects negative `max_messages` / `max_bytes` at
670        // stream-create time, surfacing as a runtime adapter error
671        // instead of at startup `validate()` (the documented
672        // purpose of this method). The fields are typed `i64` for
673        // wire-compat with the NATS API but only non-negative
674        // values make sense — a builder's `with_max_messages(-1)`
675        // would happily store the negative and explode minutes
676        // later. Reject at validation time so the misconfig is
677        // caught before any connection attempt.
678        if let Some(n) = self.max_messages {
679            if n < 0 {
680                return Err(ConfigError::InvalidValue(format!(
681                    "jetstream max_messages must be non-negative (got {n})"
682                )));
683            }
684        }
685        if let Some(n) = self.max_bytes {
686            if n < 0 {
687                return Err(ConfigError::InvalidValue(format!(
688                    "jetstream max_bytes must be non-negative (got {n})"
689                )));
690            }
691        }
692        Ok(())
693    }
694}
695
696/// Configuration errors.
697#[derive(Debug, Clone)]
698pub enum ConfigError {
699    /// Invalid configuration value.
700    InvalidValue(String),
701}
702
703impl std::fmt::Display for ConfigError {
704    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
705        match self {
706            Self::InvalidValue(msg) => write!(f, "invalid configuration: {}", msg),
707        }
708    }
709}
710
711impl std::error::Error for ConfigError {}
712
713/// Policy configuration for dynamic shard scaling.
714#[derive(Debug, Clone)]
715pub struct ScalingPolicy {
716    /// Fill ratio threshold to trigger scale-up (0.0 - 1.0).
717    /// Default: 0.7 (70%)
718    pub fill_ratio_threshold: f64,
719
720    /// Push latency threshold in nanoseconds to trigger scale-up.
721    /// Default: 5ns (after this, we're seeing contention)
722    pub push_latency_threshold_ns: u64,
723
724    /// Batch flush latency threshold in microseconds to trigger scale-up.
725    /// Default: 1000μs (1ms)
726    pub flush_latency_threshold_us: u64,
727
728    /// Minimum number of shards (floor for scaling down).
729    pub min_shards: u16,
730
731    /// Maximum number of shards (ceiling for scaling up).
732    pub max_shards: u16,
733
734    /// Cooldown period between scaling operations.
735    /// Default: 1 second
736    pub cooldown: Duration,
737
738    /// How long a shard must be underutilized before scaling down.
739    /// Default: 10 seconds
740    pub scale_down_delay: Duration,
741
742    /// Fill ratio below which a shard is considered underutilized.
743    /// Default: 0.1 (10%)
744    pub underutilized_threshold: f64,
745
746    /// Metrics collection window.
747    /// Default: 100ms
748    pub metrics_window: Duration,
749
750    /// Enable automatic scaling (if false, scaling is manual only).
751    pub auto_scale: bool,
752}
753
754impl Default for ScalingPolicy {
755    fn default() -> Self {
756        Self::default_for_cpus(num_cpus())
757    }
758}
759
760impl ScalingPolicy {
761    /// Create a default scaling policy based on CPU count.
762    /// Scales from 1 shard up to the number of physical cores.
763    pub fn default_for_cpus(cpus: u16) -> Self {
764        Self {
765            fill_ratio_threshold: 0.7,
766            push_latency_threshold_ns: 5,
767            flush_latency_threshold_us: 1000,
768            min_shards: 1,
769            max_shards: cpus,
770            cooldown: Duration::from_secs(1),
771            scale_down_delay: Duration::from_secs(10),
772            underutilized_threshold: 0.1,
773            metrics_window: Duration::from_millis(100),
774            auto_scale: true,
775        }
776    }
777
778    /// Create a policy optimized for high-throughput GPU workloads.
779    /// Uses more aggressive scaling with higher max shard count.
780    ///
781    /// `max_shards` is capped at `u16::MAX` (65 535) because shard
782    /// ids are 16-bit. On hosts with more than 32 767 CPUs the
783    /// "2× CPU count" target saturates rather than wraps — this is
784    /// the intended behavior (pre-fix this was just an implicit
785    /// `saturating_mul` artifact; the cap is now documented and
786    /// the saturation is explicit).
787    pub fn high_throughput() -> Self {
788        let cpus = num_cpus();
789        Self {
790            fill_ratio_threshold: 0.6,
791            push_latency_threshold_ns: 3,
792            flush_latency_threshold_us: 500,
793            min_shards: 4.min(cpus),
794            // `saturating_mul` clamps at u16::MAX (65 535).
795            // Documented cap; not silently wrapped.
796            max_shards: cpus.saturating_mul(2),
797            cooldown: Duration::from_millis(500),
798            scale_down_delay: Duration::from_secs(30),
799            underutilized_threshold: 0.05,
800            metrics_window: Duration::from_millis(50),
801            auto_scale: true,
802        }
803    }
804
805    /// Create a conservative policy for stable workloads.
806    pub fn conservative() -> Self {
807        let cpus = num_cpus();
808        Self {
809            fill_ratio_threshold: 0.8,
810            push_latency_threshold_ns: 10,
811            flush_latency_threshold_us: 2000,
812            min_shards: 1,
813            max_shards: cpus,
814            cooldown: Duration::from_secs(5),
815            scale_down_delay: Duration::from_secs(60),
816            underutilized_threshold: 0.05,
817            metrics_window: Duration::from_millis(200),
818            auto_scale: true,
819        }
820    }
821
822    /// Normalize the policy by auto-adjusting conflicting values.
823    ///
824    /// This allows users to set either `min_shards` or `max_shards` independently
825    /// without worrying about the other. If `max_shards < min_shards`, `max_shards`
826    /// is adjusted to equal `min_shards`.
827    pub fn normalize(mut self) -> Self {
828        if self.max_shards < self.min_shards {
829            self.max_shards = self.min_shards;
830        }
831        self
832    }
833
834    /// Validate the policy.
835    ///
836    /// `is_finite()` guards reject NaN and ±∞ explicitly before
837    /// the range check runs. NaN thresholds slip past raw `<=` /
838    /// `>` comparisons (every comparison against `f64::NaN`
839    /// returns `false`), so a config deserialized from
840    /// `0.0/0.0`-style arithmetic or an unfortunate
841    /// environment-templated string would "validate" successfully
842    /// and then sit inert at runtime — `mapper.rs:560` does
843    /// `m.fill_ratio > self.policy.fill_ratio_threshold`, which is
844    /// always `false` against NaN, so the scaler would never fire
845    /// (mirror hazard for scale-down).
846    pub fn validate(&self) -> Result<(), ConfigError> {
847        if !self.fill_ratio_threshold.is_finite() {
848            return Err(ConfigError::InvalidValue(
849                "fill_ratio_threshold must be finite (NaN/±inf rejected)".into(),
850            ));
851        }
852        if self.fill_ratio_threshold <= 0.0 || self.fill_ratio_threshold > 1.0 {
853            return Err(ConfigError::InvalidValue(
854                "fill_ratio_threshold must be in (0.0, 1.0]".into(),
855            ));
856        }
857        if !self.underutilized_threshold.is_finite() {
858            return Err(ConfigError::InvalidValue(
859                "underutilized_threshold must be finite (NaN/±inf rejected)".into(),
860            ));
861        }
862        if self.underutilized_threshold < 0.0 || self.underutilized_threshold > 1.0 {
863            return Err(ConfigError::InvalidValue(
864                "underutilized_threshold must be in [0.0, 1.0]".into(),
865            ));
866        }
867        if self.min_shards == 0 {
868            return Err(ConfigError::InvalidValue("min_shards must be > 0".into()));
869        }
870        if self.max_shards < self.min_shards {
871            return Err(ConfigError::InvalidValue(
872                "max_shards must be >= min_shards".into(),
873            ));
874        }
875        // Zero durations on the scaling path either div-by-zero
876        // (`metrics_window`), thrash the scaler (`cooldown`), or scale
877        // down on the first underutilized sample (`scale_down_delay`).
878        // Reject all three at config time.
879        if self.cooldown.is_zero() {
880            return Err(ConfigError::InvalidValue("cooldown must be > 0".into()));
881        }
882        if self.metrics_window.is_zero() {
883            return Err(ConfigError::InvalidValue(
884                "metrics_window must be > 0".into(),
885            ));
886        }
887        if self.scale_down_delay.is_zero() {
888            return Err(ConfigError::InvalidValue(
889                "scale_down_delay must be > 0".into(),
890            ));
891        }
892        Ok(())
893    }
894}
895
896/// Get the number of CPU cores (fallback to 1).
897fn num_cpus() -> u16 {
898    std::thread::available_parallelism()
899        .map(|n| u16::try_from(n.get()).unwrap_or(u16::MAX))
900        .unwrap_or(1)
901}
902
903#[cfg(test)]
904mod tests {
905    use super::*;
906
907    #[test]
908    fn test_default_config() {
909        let config = EventBusConfig::default();
910        assert!(config.validate().is_ok());
911        assert!(config.num_shards > 0);
912        assert!(config.ring_buffer_capacity.is_power_of_two());
913    }
914
915    #[test]
916    fn test_builder() {
917        let config = EventBusConfig::builder()
918            .num_shards(8)
919            .ring_buffer_capacity(1 << 16)
920            .backpressure_mode(BackpressureMode::FailProducer)
921            .build()
922            .unwrap();
923
924        assert_eq!(config.num_shards, 8);
925        assert_eq!(config.ring_buffer_capacity, 65536);
926        assert_eq!(config.backpressure_mode, BackpressureMode::FailProducer);
927    }
928
929    #[test]
930    fn test_invalid_ring_buffer_capacity() {
931        let result = EventBusConfig::builder()
932            .ring_buffer_capacity(1000) // Not a power of 2
933            .build();
934
935        assert!(result.is_err());
936    }
937
938    #[test]
939    fn test_batch_config_presets() {
940        let high = BatchConfig::high_throughput();
941        assert!(high.validate().is_ok());
942        assert!(high.max_size > high.min_size);
943
944        let low = BatchConfig::low_latency();
945        assert!(low.validate().is_ok());
946        assert!(low.max_delay < high.max_delay);
947    }
948
949    #[test]
950    fn test_scaling_enabled_by_default() {
951        let config = EventBusConfig::default();
952        assert!(config.scaling.is_some());
953
954        let policy = config.scaling.unwrap();
955        assert_eq!(policy.max_shards, config.num_shards);
956        assert!(policy.auto_scale);
957    }
958
959    #[test]
960    fn test_builder_enables_scaling_by_default() {
961        let config = EventBusConfig::builder().num_shards(8).build().unwrap();
962
963        assert!(config.scaling.is_some());
964        let policy = config.scaling.unwrap();
965        assert_eq!(policy.max_shards, 8);
966    }
967
968    #[test]
969    fn test_builder_without_scaling() {
970        let config = EventBusConfig::builder()
971            .num_shards(4)
972            .without_scaling()
973            .build()
974            .unwrap();
975
976        assert!(config.scaling.is_none());
977    }
978
979    #[test]
980    fn test_with_dynamic_scaling_respects_num_shards() {
981        // with_dynamic_scaling() should use num_shards for max_shards, not CPU count
982        let config = EventBusConfig::builder()
983            .num_shards(8)
984            .with_dynamic_scaling()
985            .build()
986            .unwrap();
987
988        assert!(config.scaling.is_some());
989        let policy = config.scaling.unwrap();
990        assert_eq!(policy.max_shards, 8);
991
992        // Order shouldn't matter
993        let config2 = EventBusConfig::builder()
994            .with_dynamic_scaling()
995            .num_shards(16)
996            .build()
997            .unwrap();
998
999        assert!(config2.scaling.is_some());
1000        let policy2 = config2.scaling.unwrap();
1001        assert_eq!(policy2.max_shards, 16);
1002    }
1003
1004    #[test]
1005    fn test_scaling_policy_presets() {
1006        let high = ScalingPolicy::high_throughput();
1007        assert!(high.validate().is_ok());
1008        assert!(high.max_shards >= high.min_shards);
1009
1010        let conservative = ScalingPolicy::conservative();
1011        assert!(conservative.validate().is_ok());
1012        assert!(conservative.cooldown > high.cooldown);
1013    }
1014
1015    #[test]
1016    fn test_scaling_policy_validation() {
1017        let mut policy = ScalingPolicy {
1018            underutilized_threshold: 0.0,
1019            ..Default::default()
1020        };
1021
1022        // Valid underutilized_threshold
1023        assert!(policy.validate().is_ok());
1024        policy.underutilized_threshold = 0.5;
1025        assert!(policy.validate().is_ok());
1026        policy.underutilized_threshold = 1.0;
1027        assert!(policy.validate().is_ok());
1028
1029        // Invalid underutilized_threshold
1030        policy.underutilized_threshold = -0.1;
1031        assert!(policy.validate().is_err());
1032        policy.underutilized_threshold = 1.1;
1033        assert!(policy.validate().is_err());
1034
1035        // Reset and test fill_ratio_threshold
1036        policy.underutilized_threshold = 0.1;
1037        policy.fill_ratio_threshold = 0.0;
1038        assert!(policy.validate().is_err());
1039        policy.fill_ratio_threshold = 1.1;
1040        assert!(policy.validate().is_err());
1041    }
1042
1043    // ========================================================================
1044    // validate() must reject NaN / ±inf thresholds
1045    // ========================================================================
1046
1047    /// `validate()` rejects `f64::NaN` for both threshold fields.
1048    /// Pre-fix the raw `<=` / `>` range checks accepted NaN
1049    /// because every comparison with NaN returns `false`; the
1050    /// "validated" config then sat inert at runtime since the
1051    /// scaler's `m.fill_ratio > policy.fill_ratio_threshold` was
1052    /// always false against NaN.
1053    #[test]
1054    fn validate_rejects_nan_fill_ratio_threshold() {
1055        let policy = ScalingPolicy {
1056            fill_ratio_threshold: f64::NAN,
1057            ..Default::default()
1058        };
1059        assert!(
1060            policy.validate().is_err(),
1061            "NaN fill_ratio_threshold must be rejected",
1062        );
1063    }
1064
1065    #[test]
1066    fn validate_rejects_nan_underutilized_threshold() {
1067        let policy = ScalingPolicy {
1068            underutilized_threshold: f64::NAN,
1069            ..Default::default()
1070        };
1071        assert!(
1072            policy.validate().is_err(),
1073            "NaN underutilized_threshold must be rejected",
1074        );
1075    }
1076
1077    /// `validate()` also rejects `±inf` for both threshold fields.
1078    /// A config that arithmetic'd to infinity (e.g. divide-by-tiny)
1079    /// would have slipped through the `> 1.0` check on positive
1080    /// infinity (which IS rejected) but not on a negative infinity
1081    /// against the lower bound for `fill_ratio_threshold` —
1082    /// `-inf <= 0.0` is true, so it would have been rejected
1083    /// already; for `underutilized_threshold` the lower bound
1084    /// `-inf < 0.0` is also true. The explicit `is_finite()` guard
1085    /// pins these edge cases regardless of which bound check would
1086    /// have fired.
1087    #[test]
1088    fn validate_rejects_infinity_thresholds() {
1089        let p1 = ScalingPolicy {
1090            fill_ratio_threshold: f64::INFINITY,
1091            ..Default::default()
1092        };
1093        assert!(p1.validate().is_err());
1094
1095        let p2 = ScalingPolicy {
1096            fill_ratio_threshold: f64::NEG_INFINITY,
1097            ..Default::default()
1098        };
1099        assert!(p2.validate().is_err());
1100
1101        let p3 = ScalingPolicy {
1102            underutilized_threshold: f64::INFINITY,
1103            ..Default::default()
1104        };
1105        assert!(p3.validate().is_err());
1106
1107        let p4 = ScalingPolicy {
1108            underutilized_threshold: f64::NEG_INFINITY,
1109            ..Default::default()
1110        };
1111        assert!(p4.validate().is_err());
1112    }
1113
1114    #[test]
1115    fn test_config_validates_scaling_policy() {
1116        // Invalid scaling policy should cause config build to fail
1117        let invalid_policy = ScalingPolicy {
1118            min_shards: 10,
1119            max_shards: 5, // Invalid: min > max
1120            ..Default::default()
1121        };
1122
1123        let result = EventBusConfig::builder()
1124            .num_shards(4)
1125            .scaling(invalid_policy)
1126            .build();
1127
1128        assert!(result.is_err());
1129
1130        // Another invalid policy
1131        let invalid_policy2 = ScalingPolicy {
1132            fill_ratio_threshold: 1.5, // Invalid: > 1.0
1133            ..Default::default()
1134        };
1135
1136        let result2 = EventBusConfig::builder()
1137            .num_shards(4)
1138            .scaling(invalid_policy2)
1139            .build();
1140
1141        assert!(result2.is_err());
1142    }
1143
1144    // Regression: high_throughput() used cpus * 2 which overflows u16
1145    // on machines with >32K CPUs (BUGS_3 #7).
1146    #[test]
1147    fn test_high_throughput_max_shards_no_overflow() {
1148        let policy = ScalingPolicy::high_throughput();
1149        assert!(policy.max_shards >= policy.min_shards);
1150        assert!(policy.validate().is_ok());
1151    }
1152
1153    /// Regression: BUG_REPORT.md #3 — zero-rate `Sample` previously
1154    /// passed validation but div-by-zero'd downstream.
1155    #[test]
1156    fn test_validate_rejects_sample_rate_zero() {
1157        let result = EventBusConfig::builder()
1158            .backpressure_mode(BackpressureMode::Sample { rate: 0 })
1159            .build();
1160        assert!(
1161            result.is_err(),
1162            "BackpressureMode::Sample.rate == 0 must reject"
1163        );
1164    }
1165
1166    /// Regression: `BatchConfig::validate` must reject an
1167    /// unbounded `max_size`. The adaptive-batching code in
1168    /// `shard/batch.rs` does arithmetic like
1169    /// `current_batch_size * 3 + target` against `max_size`-clamped
1170    /// values; with `max_size = usize::MAX` the multiplication
1171    /// panics in debug builds and wraps in release. The default
1172    /// `max_size = 10_000` is safe; only a hostile config can
1173    /// trip the overflow. Cap at `MAX_BATCH_SIZE_LIMIT` so the
1174    /// arithmetic stays well below the blast radius.
1175    #[test]
1176    fn batch_config_rejects_max_size_above_limit() {
1177        // Boundary at the limit is OK.
1178        let at_limit = BatchConfig {
1179            max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT,
1180            ..Default::default()
1181        };
1182        assert!(
1183            at_limit.validate().is_ok(),
1184            "max_size at MAX_BATCH_SIZE_LIMIT must be valid"
1185        );
1186
1187        // Just past the limit must reject.
1188        let above = BatchConfig {
1189            max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT + 1,
1190            ..Default::default()
1191        };
1192        assert!(
1193            above.validate().is_err(),
1194            "max_size > MAX_BATCH_SIZE_LIMIT must reject — adaptive \
1195             arithmetic overflows past this cap"
1196        );
1197
1198        // The pathological case the audit's example targets.
1199        let hostile = BatchConfig {
1200            max_size: usize::MAX,
1201            ..Default::default()
1202        };
1203        assert!(
1204            hostile.validate().is_err(),
1205            "max_size = usize::MAX must reject (regression: \
1206             current_batch_size * 3 + target overflow)"
1207        );
1208    }
1209
1210    /// Regression: BUG_REPORT.md #3 — zero `velocity_window` with
1211    /// adaptive batching div-by-zero'd the throughput calculator.
1212    #[test]
1213    fn test_validate_rejects_zero_velocity_window_when_adaptive() {
1214        let bad = BatchConfig {
1215            adaptive: true,
1216            velocity_window: Duration::ZERO,
1217            ..Default::default()
1218        };
1219        assert!(bad.validate().is_err());
1220
1221        // Non-adaptive ignores the field.
1222        let ok = BatchConfig {
1223            adaptive: false,
1224            velocity_window: Duration::ZERO,
1225            ..Default::default()
1226        };
1227        assert!(ok.validate().is_ok());
1228    }
1229
1230    /// Regression: BUG_REPORT.md #3 — zero `adapter_timeout` made
1231    /// every adapter call time out instantly.
1232    #[test]
1233    fn test_validate_rejects_zero_adapter_timeout() {
1234        let config = EventBusConfig {
1235            adapter_timeout: Duration::ZERO,
1236            ..EventBusConfig::default()
1237        };
1238        assert!(config.validate().is_err());
1239    }
1240
1241    /// Regression: BUG_REPORT.md #3 — `ScalingPolicy` durations of
1242    /// zero either div-by-zero'd, thrashed the scaler, or scaled
1243    /// down on the first underutilized sample.
1244    #[test]
1245    fn test_validate_rejects_zero_scaling_durations() {
1246        let base = ScalingPolicy::default();
1247
1248        let mut p = base.clone();
1249        p.cooldown = Duration::ZERO;
1250        assert!(p.validate().is_err());
1251
1252        let mut p = base.clone();
1253        p.metrics_window = Duration::ZERO;
1254        assert!(p.validate().is_err());
1255
1256        let mut p = base;
1257        p.scale_down_delay = Duration::ZERO;
1258        assert!(p.validate().is_err());
1259    }
1260
1261    /// Regression: BUG_REPORT.md #3 — `RedisAdapterConfig` had no
1262    /// `validate()` and `pipeline_size: 0` shipped through to a
1263    /// runtime panic.
1264    #[cfg(feature = "redis")]
1265    #[test]
1266    fn test_validate_redis_pipeline_size_zero_rejected() {
1267        let mut redis = RedisAdapterConfig::new("redis://localhost:6379");
1268        redis.pipeline_size = 0;
1269
1270        let result = EventBusConfig::builder()
1271            .adapter(AdapterConfig::Redis(redis))
1272            .build();
1273        assert!(result.is_err(), "redis pipeline_size == 0 must reject");
1274    }
1275
1276    /// Regression: BUG_REPORT.md #3 — `JetStreamAdapterConfig` had
1277    /// no `validate()` either.
1278    #[cfg(feature = "jetstream")]
1279    #[test]
1280    fn test_validate_jetstream_replicas_zero_rejected() {
1281        let mut js = JetStreamAdapterConfig::new("nats://localhost:4222");
1282        js.replicas = 0;
1283
1284        let result = EventBusConfig::builder()
1285            .adapter(AdapterConfig::JetStream(js))
1286            .build();
1287        assert!(result.is_err(), "jetstream replicas == 0 must reject");
1288    }
1289
1290    /// `JetStreamAdapterConfig::validate` rejects negative
1291    /// `max_messages` / `max_bytes`. NATS rejects negatives at
1292    /// stream-create time, so without validate-time enforcement the
1293    /// misconfig surfaces as a runtime adapter error minutes later.
1294    #[cfg(feature = "jetstream")]
1295    #[test]
1296    fn validate_rejects_negative_max_messages() {
1297        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(-1);
1298        let err = js
1299            .validate()
1300            .expect_err("negative max_messages must reject");
1301        let msg = format!("{err}");
1302        assert!(
1303            msg.contains("max_messages"),
1304            "error must mention the field, got: {msg}"
1305        );
1306    }
1307
1308    #[cfg(feature = "jetstream")]
1309    #[test]
1310    fn validate_rejects_negative_max_bytes() {
1311        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_bytes(-100);
1312        let err = js.validate().expect_err("negative max_bytes must reject");
1313        let msg = format!("{err}");
1314        assert!(
1315            msg.contains("max_bytes"),
1316            "error must mention the field, got: {msg}"
1317        );
1318    }
1319
1320    #[cfg(feature = "jetstream")]
1321    #[test]
1322    fn validate_accepts_zero_and_positive_max_messages() {
1323        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(0);
1324        assert!(js.validate().is_ok(), "zero must be accepted");
1325        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(1_000_000);
1326        assert!(js.validate().is_ok(), "positive must be accepted");
1327    }
1328}