Skip to main content

net/
config.rs

1//! Configuration types for the Net event bus.
2
3use std::path::PathBuf;
4use std::time::Duration;
5
6/// Top-level configuration for the event bus.
7#[derive(Debug, Clone)]
8pub struct EventBusConfig {
9    /// Number of shards for parallel ingestion.
10    /// Each shard has its own ring buffer and batch worker.
11    /// Default: number of CPU cores.
12    ///
13    /// Unless you're connected to an advanced AI orchestrator, swarm controller,
14    /// or a local Nvidia Blackwell GPU cluster, any number is fine - it will
15    /// revert to your physical core count by default.
16    pub num_shards: u16,
17
18    /// Capacity of each shard's ring buffer (number of events).
19    /// Must be a power of 2 for efficient modulo operations.
20    /// Default: 1,048,576 (1M events per shard).
21    pub ring_buffer_capacity: usize,
22
23    /// Backpressure policy when ring buffers are full.
24    pub backpressure_mode: BackpressureMode,
25
26    /// Batch aggregation configuration.
27    pub batch: BatchConfig,
28
29    /// Adapter configuration.
30    pub adapter: AdapterConfig,
31
32    /// Dynamic scaling configuration.
33    /// If None, dynamic scaling is disabled.
34    pub scaling: Option<ScalingPolicy>,
35
36    /// Timeout for adapter operations (init, on_batch, flush, shutdown).
37    /// Prevents a hanging adapter from blocking the event bus.
38    /// Default: 30 seconds.
39    pub adapter_timeout: Duration,
40
41    /// Number of times to retry a failed on_batch before dropping the batch.
42    /// 0 = no retries (drop immediately on failure, default).
43    /// Retries use a fixed 100ms delay between attempts.
44    pub adapter_batch_retries: u32,
45
46    /// File path for the persistent producer nonce. When
47    /// `Some`, the bus loads (or creates on first run) the u64
48    /// nonce at this path on startup and stamps it on every
49    /// outgoing batch. Adapters that key dedup on
50    /// `(producer_nonce, shard, sequence_start, i)` then dedup
51    /// retransmits across process restart — JetStream's
52    /// `Nats-Msg-Id` is the canonical example.
53    ///
54    /// When `None` (default), the bus uses a per-process nonce
55    /// sampled fresh at every startup (today's behavior). That's
56    /// fine for in-memory adapters and for any deployment where
57    /// "at-most-once across process restart" is acceptable;
58    /// production JetStream / Redis deployments should set this
59    /// to a stable path on local persistent storage.
60    pub producer_nonce_path: Option<PathBuf>,
61}
62
63impl Default for EventBusConfig {
64    fn default() -> Self {
65        let cpus = num_cpus();
66        Self {
67            num_shards: cpus,
68            ring_buffer_capacity: 1 << 20, // 1M events
69            backpressure_mode: BackpressureMode::DropOldest,
70            batch: BatchConfig::default(),
71            adapter: AdapterConfig::Noop,
72            scaling: Some(ScalingPolicy::default_for_cpus(cpus)),
73            adapter_timeout: Duration::from_secs(30),
74            adapter_batch_retries: 0,
75            producer_nonce_path: None,
76        }
77    }
78}
79
80impl EventBusConfig {
81    /// Create a new configuration builder.
82    pub fn builder() -> EventBusConfigBuilder {
83        EventBusConfigBuilder::default()
84    }
85
86    /// Validate the configuration.
87    pub fn validate(&self) -> Result<(), ConfigError> {
88        if self.num_shards == 0 {
89            return Err(ConfigError::InvalidValue("num_shards must be > 0".into()));
90        }
91        if !self.ring_buffer_capacity.is_power_of_two() {
92            return Err(ConfigError::InvalidValue(
93                "ring_buffer_capacity must be a power of 2".into(),
94            ));
95        }
96        if self.ring_buffer_capacity < 1024 {
97            return Err(ConfigError::InvalidValue(
98                "ring_buffer_capacity must be >= 1024".into(),
99            ));
100        }
101        // Adapter timeout of zero would make every adapter call time
102        // out instantly. Reject at config time.
103        if self.adapter_timeout.is_zero() {
104            return Err(ConfigError::InvalidValue(
105                "adapter_timeout must be > 0".into(),
106            ));
107        }
108        // `Sample { rate: 0 }` was accepted but crashed downstream
109        // sampling (counter % 0).
110        if let BackpressureMode::Sample { rate: 0 } = self.backpressure_mode {
111            return Err(ConfigError::InvalidValue(
112                "BackpressureMode::Sample.rate must be > 0".into(),
113            ));
114        }
115        self.batch.validate()?;
116        if let Some(ref scaling) = self.scaling {
117            scaling
118                .validate()
119                .map_err(|e| ConfigError::InvalidValue(format!("scaling policy: {}", e)))?;
120        }
121        // Recurse into adapter configs. Previously these were
122        // accepted blindly and zero-divisor fields like
123        // `RedisAdapterConfig::pipeline_size: 0` shipped through to
124        // runtime panics.
125        match &self.adapter {
126            AdapterConfig::Noop => {}
127            #[cfg(feature = "redis")]
128            AdapterConfig::Redis(c) => c
129                .validate()
130                .map_err(|e| ConfigError::InvalidValue(format!("redis adapter: {}", e)))?,
131            #[cfg(feature = "jetstream")]
132            AdapterConfig::JetStream(c) => c
133                .validate()
134                .map_err(|e| ConfigError::InvalidValue(format!("jetstream adapter: {}", e)))?,
135            #[cfg(feature = "net")]
136            AdapterConfig::Net(_) => {} // Net adapter has its own
137                                        // validation pipeline, not in scope here.
138        }
139        Ok(())
140    }
141}
142
143/// Scaling configuration for the builder.
144#[derive(Debug, Clone)]
145enum ScalingConfig {
146    /// Use default policy based on num_shards (resolved at build time).
147    Default,
148    /// Explicitly disabled.
149    Disabled,
150    /// Explicit policy.
151    Policy(ScalingPolicy),
152}
153
154/// Builder for EventBusConfig.
155#[derive(Debug, Default)]
156pub struct EventBusConfigBuilder {
157    num_shards: Option<u16>,
158    ring_buffer_capacity: Option<usize>,
159    backpressure_mode: Option<BackpressureMode>,
160    batch: Option<BatchConfig>,
161    adapter: Option<AdapterConfig>,
162    scaling: Option<ScalingConfig>,
163    adapter_timeout: Option<Duration>,
164    adapter_batch_retries: Option<u32>,
165    producer_nonce_path: Option<PathBuf>,
166}
167
168impl EventBusConfigBuilder {
169    /// Set the number of shards.
170    pub fn num_shards(mut self, n: u16) -> Self {
171        self.num_shards = Some(n);
172        self
173    }
174
175    /// Set the ring buffer capacity per shard.
176    pub fn ring_buffer_capacity(mut self, cap: usize) -> Self {
177        self.ring_buffer_capacity = Some(cap);
178        self
179    }
180
181    /// Set the backpressure mode.
182    pub fn backpressure_mode(mut self, mode: BackpressureMode) -> Self {
183        self.backpressure_mode = Some(mode);
184        self
185    }
186
187    /// Set the batch configuration.
188    pub fn batch(mut self, config: BatchConfig) -> Self {
189        self.batch = Some(config);
190        self
191    }
192
193    /// Set the adapter configuration.
194    pub fn adapter(mut self, config: AdapterConfig) -> Self {
195        self.adapter = Some(config);
196        self
197    }
198
199    /// Enable dynamic scaling with the given policy.
200    pub fn scaling(mut self, policy: ScalingPolicy) -> Self {
201        self.scaling = Some(ScalingConfig::Policy(policy));
202        self
203    }
204
205    /// Enable dynamic scaling with default policy.
206    /// The policy's max_shards will be based on num_shards (resolved at build time).
207    pub fn with_dynamic_scaling(mut self) -> Self {
208        self.scaling = Some(ScalingConfig::Default);
209        self
210    }
211
212    /// Disable dynamic scaling (use fixed shard count).
213    pub fn without_scaling(mut self) -> Self {
214        self.scaling = Some(ScalingConfig::Disabled);
215        self
216    }
217
218    /// Set the adapter operation timeout.
219    pub fn adapter_timeout(mut self, timeout: Duration) -> Self {
220        self.adapter_timeout = Some(timeout);
221        self
222    }
223
224    /// Set the number of retries for failed on_batch calls.
225    /// 0 = no retries (default). Useful for Redis/JetStream under intermittent failures.
226    pub fn adapter_batch_retries(mut self, retries: u32) -> Self {
227        self.adapter_batch_retries = Some(retries);
228        self
229    }
230
231    /// Persist the producer nonce at `path` so it survives process
232    /// restart. Recommended for any deployment using
233    /// JetStream / Redis adapters where retries-after-crash should
234    /// dedup against the prior incarnation. See
235    /// `EventBusConfig::producer_nonce_path` for the full doc.
236    pub fn producer_nonce_path(mut self, path: impl Into<PathBuf>) -> Self {
237        self.producer_nonce_path = Some(path.into());
238        self
239    }
240
241    /// Build the configuration, validating all settings.
242    pub fn build(self) -> Result<EventBusConfig, ConfigError> {
243        let num_shards = self.num_shards.unwrap_or_else(num_cpus);
244        let scaling = match self.scaling {
245            Some(ScalingConfig::Policy(policy)) => Some(policy),
246            Some(ScalingConfig::Default) | None => {
247                Some(ScalingPolicy::default_for_cpus(num_shards))
248            }
249            Some(ScalingConfig::Disabled) => None,
250        };
251        let config = EventBusConfig {
252            num_shards,
253            ring_buffer_capacity: self.ring_buffer_capacity.unwrap_or(1 << 20),
254            backpressure_mode: self
255                .backpressure_mode
256                .unwrap_or(BackpressureMode::DropOldest),
257            batch: self.batch.unwrap_or_default(),
258            adapter: self.adapter.unwrap_or(AdapterConfig::Noop),
259            scaling,
260            adapter_timeout: self.adapter_timeout.unwrap_or(Duration::from_secs(30)),
261            adapter_batch_retries: self.adapter_batch_retries.unwrap_or(0),
262            producer_nonce_path: self.producer_nonce_path,
263        };
264        config.validate()?;
265        Ok(config)
266    }
267}
268
269/// Backpressure policy when ring buffers are full.
270#[derive(Debug, Clone, Copy, PartialEq, Eq)]
271pub enum BackpressureMode {
272    /// Drop the newest event (the one being inserted).
273    DropNewest,
274
275    /// Drop the oldest event in the buffer to make room.
276    DropOldest,
277
278    /// Return an error to the producer.
279    FailProducer,
280
281    /// Sample events: keep 1 in N events.
282    Sample {
283        /// Keep 1 event for every `rate` events.
284        rate: u32,
285    },
286}
287
288/// Batch aggregation configuration.
289#[derive(Debug, Clone)]
290pub struct BatchConfig {
291    /// Minimum batch size (floor for adaptive sizing).
292    /// Default: 1,000 events.
293    pub min_size: usize,
294
295    /// Maximum batch size (ceiling for adaptive sizing).
296    /// Default: 10,000 events.
297    pub max_size: usize,
298
299    /// Maximum time to wait before flushing a partial batch.
300    /// Default: 10ms.
301    pub max_delay: Duration,
302
303    /// Enable adaptive batch sizing based on ingestion velocity.
304    /// Default: true.
305    pub adaptive: bool,
306
307    /// Window size for velocity calculation (adaptive mode).
308    /// Default: 100ms.
309    pub velocity_window: Duration,
310}
311
312impl Default for BatchConfig {
313    fn default() -> Self {
314        Self {
315            min_size: 1_000,
316            max_size: 10_000,
317            max_delay: Duration::from_millis(10),
318            adaptive: true,
319            velocity_window: Duration::from_millis(100),
320        }
321    }
322}
323
324impl BatchConfig {
325    /// Upper bound on `max_size`. The adaptive-batching code in
326    /// `shard/batch.rs` uses arithmetic like
327    /// `current_batch_size * 3 + target` against `max_size`-clamped
328    /// values; with `max_size = usize::MAX` the multiplication
329    /// panics in debug builds and wraps in release. The default
330    /// production `max_size = 10_000` is far below this cap, so
331    /// this is purely a hostile-config guard. Set well above any
332    /// plausible workload (`high_throughput` ships `50_000`) but
333    /// well below the arithmetic-blast radius.
334    pub const MAX_BATCH_SIZE_LIMIT: usize = 1_000_000;
335
336    /// Validate the batch configuration.
337    pub fn validate(&self) -> Result<(), ConfigError> {
338        if self.min_size == 0 {
339            return Err(ConfigError::InvalidValue("min_size must be > 0".into()));
340        }
341        if self.max_size < self.min_size {
342            return Err(ConfigError::InvalidValue(
343                "max_size must be >= min_size".into(),
344            ));
345        }
346        if self.max_size > Self::MAX_BATCH_SIZE_LIMIT {
347            return Err(ConfigError::InvalidValue(format!(
348                "max_size must be <= {} (hostile-config guard against \
349                 `current_batch_size * 3 + target` overflow in adaptive batching)",
350                Self::MAX_BATCH_SIZE_LIMIT,
351            )));
352        }
353        if self.max_delay.is_zero() {
354            return Err(ConfigError::InvalidValue("max_delay must be > 0".into()));
355        }
356        // Zero `velocity_window` div-by-zeros the throughput
357        // calculator when adaptive batching is enabled. Validate
358        // only when the field is actually consulted.
359        if self.adaptive && self.velocity_window.is_zero() {
360            return Err(ConfigError::InvalidValue(
361                "velocity_window must be > 0 when adaptive batching is enabled".into(),
362            ));
363        }
364        Ok(())
365    }
366
367    /// Create a high-throughput configuration optimized for Blackwell workloads.
368    pub fn high_throughput() -> Self {
369        Self {
370            min_size: 5_000,
371            max_size: 50_000,
372            max_delay: Duration::from_millis(5),
373            adaptive: true,
374            velocity_window: Duration::from_millis(50),
375        }
376    }
377
378    /// Create a low-latency configuration for interactive workloads.
379    pub fn low_latency() -> Self {
380        Self {
381            min_size: 100,
382            max_size: 1_000,
383            max_delay: Duration::from_millis(1),
384            adaptive: true,
385            velocity_window: Duration::from_millis(20),
386        }
387    }
388}
389
390/// Adapter configuration.
391#[derive(Debug, Clone)]
392pub enum AdapterConfig {
393    /// No-op adapter (events are discarded after batching).
394    /// Useful for testing and benchmarking.
395    Noop,
396
397    /// Redis Streams adapter.
398    #[cfg(feature = "redis")]
399    Redis(RedisAdapterConfig),
400
401    /// NATS JetStream adapter.
402    #[cfg(feature = "jetstream")]
403    JetStream(JetStreamAdapterConfig),
404
405    /// Net (Net L0 Transport Protocol) adapter.
406    /// High-performance UDP transport for GPU-to-GPU communication.
407    #[cfg(feature = "net")]
408    Net(Box<crate::adapter::net::NetAdapterConfig>),
409}
410
411/// Redis adapter configuration.
412#[cfg(feature = "redis")]
413#[derive(Debug, Clone)]
414pub struct RedisAdapterConfig {
415    /// Redis connection URL.
416    /// Example: "redis://localhost:6379"
417    pub url: String,
418
419    /// Stream key prefix.
420    /// Streams are named: "{prefix}:shard:{shard_id}"
421    /// Default: "net"
422    pub prefix: String,
423
424    /// Maximum commands per pipeline.
425    /// Default: 1000.
426    pub pipeline_size: usize,
427
428    /// Connection pool size.
429    /// Default: number of shards.
430    pub pool_size: Option<usize>,
431
432    /// Connection timeout.
433    /// Default: 5 seconds.
434    pub connect_timeout: Duration,
435
436    /// Command timeout.
437    /// Default: 1 second.
438    pub command_timeout: Duration,
439
440    /// Maximum stream length (MAXLEN for XADD).
441    /// None = unlimited.
442    pub max_stream_len: Option<usize>,
443}
444
445#[cfg(feature = "redis")]
446impl RedisAdapterConfig {
447    /// Create a new Redis adapter configuration.
448    pub fn new(url: impl Into<String>) -> Self {
449        Self {
450            url: url.into(),
451            prefix: "net".into(),
452            pipeline_size: 1000,
453            pool_size: None,
454            connect_timeout: Duration::from_secs(5),
455            command_timeout: Duration::from_secs(1),
456            max_stream_len: None,
457        }
458    }
459
460    /// Set the stream key prefix.
461    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
462        self.prefix = prefix.into();
463        self
464    }
465
466    /// Set the pipeline size.
467    pub fn with_pipeline_size(mut self, size: usize) -> Self {
468        self.pipeline_size = size;
469        self
470    }
471
472    /// Set the connection pool size.
473    pub fn with_pool_size(mut self, size: usize) -> Self {
474        self.pool_size = Some(size);
475        self
476    }
477
478    /// Set the connection timeout.
479    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
480        self.connect_timeout = timeout;
481        self
482    }
483
484    /// Set the command timeout.
485    pub fn with_command_timeout(mut self, timeout: Duration) -> Self {
486        self.command_timeout = timeout;
487        self
488    }
489
490    /// Set the maximum stream length.
491    pub fn with_max_stream_len(mut self, len: usize) -> Self {
492        self.max_stream_len = Some(len);
493        self
494    }
495
496    /// Validate the configuration. Called from
497    /// `EventBusConfig::validate` so adapter misconfiguration is
498    /// caught at startup rather than at the first batch dispatch.
499    pub fn validate(&self) -> Result<(), ConfigError> {
500        if self.url.is_empty() {
501            return Err(ConfigError::InvalidValue(
502                "redis url must be non-empty".into(),
503            ));
504        }
505        if self.pipeline_size == 0 {
506            return Err(ConfigError::InvalidValue(
507                "redis pipeline_size must be > 0".into(),
508            ));
509        }
510        if self.connect_timeout.is_zero() {
511            return Err(ConfigError::InvalidValue(
512                "redis connect_timeout must be > 0".into(),
513            ));
514        }
515        if self.command_timeout.is_zero() {
516            return Err(ConfigError::InvalidValue(
517                "redis command_timeout must be > 0".into(),
518            ));
519        }
520        Ok(())
521    }
522}
523
524/// NATS JetStream adapter configuration.
525#[cfg(feature = "jetstream")]
526#[derive(Debug, Clone)]
527pub struct JetStreamAdapterConfig {
528    /// NATS server URL.
529    /// Example: "nats://localhost:4222"
530    pub url: String,
531
532    /// Stream name prefix.
533    /// Streams are named: "{prefix}_shard_{shard_id}"
534    /// Default: "net"
535    pub prefix: String,
536
537    /// Connection timeout.
538    /// Default: 5 seconds.
539    pub connect_timeout: Duration,
540
541    /// Request timeout for publish/fetch operations.
542    /// Default: 5 seconds.
543    ///
544    /// **Worst-case wall-clock note:** `on_batch` applies this timeout
545    /// independently to (1) the publish-enqueue phase and (2) the
546    /// per-message ack-wait phase, so a single `on_batch` call can
547    /// take up to **2 × request_timeout** before returning. Operators
548    /// sizing the bus-side dispatch timeout
549    /// (`EventBusConfig::adapter_timeout`) should account for this
550    /// 2× factor; setting `adapter_timeout < 2 * request_timeout`
551    /// classifies as a timeout-induced drop even on healthy
552    /// JetStream nodes under load.
553    pub request_timeout: Duration,
554
555    /// Maximum messages per stream (oldest are discarded when exceeded).
556    /// None = unlimited.
557    pub max_messages: Option<i64>,
558
559    /// Maximum bytes per stream.
560    /// None = unlimited.
561    pub max_bytes: Option<i64>,
562
563    /// Maximum age for messages in the stream.
564    /// None = unlimited.
565    pub max_age: Option<Duration>,
566
567    /// Number of stream replicas for fault tolerance.
568    /// Default: 1 (no replication).
569    pub replicas: usize,
570
571    /// Server-side dedup window for `Nats-Msg-Id` header matching.
572    /// JetStream discards a publish whose msg-id matches one
573    /// observed within this window — the bus's `on_batch` retry
574    /// path relies on this to make mid-batch failures idempotent.
575    /// Default: 1 hour.
576    ///
577    /// The NATS / async-nats default is 2 minutes. Under the bus's
578    /// retry policy a slow caller (network flap, long backoff,
579    /// queued-up backpressure) can land the same `(nonce, shard,
580    /// seq)` msg-id past the 2 min mark, where dedup no longer
581    /// fires and the same event publishes at two distinct
582    /// JetStream sequences. 1 hour is wider than any realistic
583    /// retry envelope while bounding server-side dedup-table
584    /// memory growth (one entry per unique msg-id observed within
585    /// the window).
586    pub dedup_window: Duration,
587}
588
589#[cfg(feature = "jetstream")]
590impl JetStreamAdapterConfig {
591    /// Create a new JetStream adapter configuration.
592    pub fn new(url: impl Into<String>) -> Self {
593        Self {
594            url: url.into(),
595            prefix: "net".into(),
596            connect_timeout: Duration::from_secs(5),
597            request_timeout: Duration::from_secs(5),
598            max_messages: None,
599            max_bytes: None,
600            max_age: None,
601            replicas: 1,
602            dedup_window: Duration::from_secs(3600),
603        }
604    }
605
606    /// Set the JetStream dedup window. See the field doc on
607    /// [`Self::dedup_window`] for the trade-off vs the NATS default.
608    pub fn with_dedup_window(mut self, window: Duration) -> Self {
609        self.dedup_window = window;
610        self
611    }
612
613    /// Set the stream name prefix.
614    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
615        self.prefix = prefix.into();
616        self
617    }
618
619    /// Set the connection timeout.
620    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
621        self.connect_timeout = timeout;
622        self
623    }
624
625    /// Set the request timeout.
626    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
627        self.request_timeout = timeout;
628        self
629    }
630
631    /// Set the maximum messages per stream.
632    pub fn with_max_messages(mut self, max: i64) -> Self {
633        self.max_messages = Some(max);
634        self
635    }
636
637    /// Set the maximum bytes per stream.
638    pub fn with_max_bytes(mut self, max: i64) -> Self {
639        self.max_bytes = Some(max);
640        self
641    }
642
643    /// Set the maximum age for messages.
644    pub fn with_max_age(mut self, age: Duration) -> Self {
645        self.max_age = Some(age);
646        self
647    }
648
649    /// Set the number of replicas.
650    pub fn with_replicas(mut self, replicas: usize) -> Self {
651        self.replicas = replicas;
652        self
653    }
654
655    /// Validate the configuration. Called from
656    /// `EventBusConfig::validate` so adapter misconfiguration is
657    /// caught at startup rather than at the first batch dispatch.
658    pub fn validate(&self) -> Result<(), ConfigError> {
659        if self.url.is_empty() {
660            return Err(ConfigError::InvalidValue(
661                "jetstream url must be non-empty".into(),
662            ));
663        }
664        if self.connect_timeout.is_zero() {
665            return Err(ConfigError::InvalidValue(
666                "jetstream connect_timeout must be > 0".into(),
667            ));
668        }
669        if self.request_timeout.is_zero() {
670            return Err(ConfigError::InvalidValue(
671                "jetstream request_timeout must be > 0".into(),
672            ));
673        }
674        if self.replicas == 0 {
675            return Err(ConfigError::InvalidValue(
676                "jetstream replicas must be >= 1".into(),
677            ));
678        }
679        // NATS rejects negative `max_messages` / `max_bytes` at
680        // stream-create time, surfacing as a runtime adapter error
681        // instead of at startup `validate()` (the documented
682        // purpose of this method). The fields are typed `i64` for
683        // wire-compat with the NATS API but only non-negative
684        // values make sense — a builder's `with_max_messages(-1)`
685        // would happily store the negative and explode minutes
686        // later. Reject at validation time so the misconfig is
687        // caught before any connection attempt.
688        if let Some(n) = self.max_messages {
689            if n < 0 {
690                return Err(ConfigError::InvalidValue(format!(
691                    "jetstream max_messages must be non-negative (got {n})"
692                )));
693            }
694        }
695        if let Some(n) = self.max_bytes {
696            if n < 0 {
697                return Err(ConfigError::InvalidValue(format!(
698                    "jetstream max_bytes must be non-negative (got {n})"
699                )));
700            }
701        }
702        Ok(())
703    }
704}
705
706/// Configuration errors.
707#[derive(Debug, Clone)]
708pub enum ConfigError {
709    /// Invalid configuration value.
710    InvalidValue(String),
711}
712
713impl std::fmt::Display for ConfigError {
714    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
715        match self {
716            Self::InvalidValue(msg) => write!(f, "invalid configuration: {}", msg),
717        }
718    }
719}
720
721impl std::error::Error for ConfigError {}
722
723/// Policy configuration for dynamic shard scaling.
724#[derive(Debug, Clone)]
725pub struct ScalingPolicy {
726    /// Fill ratio threshold to trigger scale-up (0.0 - 1.0).
727    /// Default: 0.7 (70%)
728    pub fill_ratio_threshold: f64,
729
730    /// Push latency threshold in nanoseconds to trigger scale-up.
731    /// Default: 5ns (after this, we're seeing contention)
732    pub push_latency_threshold_ns: u64,
733
734    /// Batch flush latency threshold in microseconds to trigger scale-up.
735    /// Default: 1000μs (1ms)
736    pub flush_latency_threshold_us: u64,
737
738    /// Minimum number of shards (floor for scaling down).
739    pub min_shards: u16,
740
741    /// Maximum number of shards (ceiling for scaling up).
742    pub max_shards: u16,
743
744    /// Cooldown period between scaling operations.
745    /// Default: 1 second
746    pub cooldown: Duration,
747
748    /// How long a shard must be underutilized before scaling down.
749    /// Default: 10 seconds
750    pub scale_down_delay: Duration,
751
752    /// Fill ratio below which a shard is considered underutilized.
753    /// Default: 0.1 (10%)
754    pub underutilized_threshold: f64,
755
756    /// Metrics collection window.
757    /// Default: 100ms
758    pub metrics_window: Duration,
759
760    /// Enable automatic scaling (if false, scaling is manual only).
761    pub auto_scale: bool,
762}
763
764impl Default for ScalingPolicy {
765    fn default() -> Self {
766        Self::default_for_cpus(num_cpus())
767    }
768}
769
770impl ScalingPolicy {
771    /// Create a default scaling policy based on CPU count.
772    /// Scales from 1 shard up to the number of physical cores.
773    pub fn default_for_cpus(cpus: u16) -> Self {
774        Self {
775            fill_ratio_threshold: 0.7,
776            push_latency_threshold_ns: 5,
777            flush_latency_threshold_us: 1000,
778            min_shards: 1,
779            max_shards: cpus,
780            cooldown: Duration::from_secs(1),
781            scale_down_delay: Duration::from_secs(10),
782            underutilized_threshold: 0.1,
783            metrics_window: Duration::from_millis(100),
784            auto_scale: true,
785        }
786    }
787
788    /// Create a policy optimized for high-throughput GPU workloads.
789    /// Uses more aggressive scaling with higher max shard count.
790    ///
791    /// `max_shards` is capped at `u16::MAX` (65 535) because shard
792    /// ids are 16-bit. On hosts with more than 32 767 CPUs the
793    /// "2× CPU count" target saturates rather than wraps — this is
794    /// the intended behavior (pre-fix this was just an implicit
795    /// `saturating_mul` artifact; the cap is now documented and
796    /// the saturation is explicit).
797    pub fn high_throughput() -> Self {
798        let cpus = num_cpus();
799        Self {
800            fill_ratio_threshold: 0.6,
801            push_latency_threshold_ns: 3,
802            flush_latency_threshold_us: 500,
803            min_shards: 4.min(cpus),
804            // `saturating_mul` clamps at u16::MAX (65 535).
805            // Documented cap; not silently wrapped.
806            max_shards: cpus.saturating_mul(2),
807            cooldown: Duration::from_millis(500),
808            scale_down_delay: Duration::from_secs(30),
809            underutilized_threshold: 0.05,
810            metrics_window: Duration::from_millis(50),
811            auto_scale: true,
812        }
813    }
814
815    /// Create a conservative policy for stable workloads.
816    pub fn conservative() -> Self {
817        let cpus = num_cpus();
818        Self {
819            fill_ratio_threshold: 0.8,
820            push_latency_threshold_ns: 10,
821            flush_latency_threshold_us: 2000,
822            min_shards: 1,
823            max_shards: cpus,
824            cooldown: Duration::from_secs(5),
825            scale_down_delay: Duration::from_secs(60),
826            underutilized_threshold: 0.05,
827            metrics_window: Duration::from_millis(200),
828            auto_scale: true,
829        }
830    }
831
832    /// Normalize the policy by auto-adjusting conflicting values.
833    ///
834    /// This allows users to set either `min_shards` or `max_shards` independently
835    /// without worrying about the other. If `max_shards < min_shards`, `max_shards`
836    /// is adjusted to equal `min_shards`.
837    pub fn normalize(mut self) -> Self {
838        if self.max_shards < self.min_shards {
839            self.max_shards = self.min_shards;
840        }
841        self
842    }
843
844    /// Validate the policy.
845    ///
846    /// `is_finite()` guards reject NaN and ±∞ explicitly before
847    /// the range check runs. NaN thresholds slip past raw `<=` /
848    /// `>` comparisons (every comparison against `f64::NaN`
849    /// returns `false`), so a config deserialized from
850    /// `0.0/0.0`-style arithmetic or an unfortunate
851    /// environment-templated string would "validate" successfully
852    /// and then sit inert at runtime — `mapper.rs:560` does
853    /// `m.fill_ratio > self.policy.fill_ratio_threshold`, which is
854    /// always `false` against NaN, so the scaler would never fire
855    /// (mirror hazard for scale-down).
856    pub fn validate(&self) -> Result<(), ConfigError> {
857        if !self.fill_ratio_threshold.is_finite() {
858            return Err(ConfigError::InvalidValue(
859                "fill_ratio_threshold must be finite (NaN/±inf rejected)".into(),
860            ));
861        }
862        if self.fill_ratio_threshold <= 0.0 || self.fill_ratio_threshold > 1.0 {
863            return Err(ConfigError::InvalidValue(
864                "fill_ratio_threshold must be in (0.0, 1.0]".into(),
865            ));
866        }
867        if !self.underutilized_threshold.is_finite() {
868            return Err(ConfigError::InvalidValue(
869                "underutilized_threshold must be finite (NaN/±inf rejected)".into(),
870            ));
871        }
872        if self.underutilized_threshold < 0.0 || self.underutilized_threshold > 1.0 {
873            return Err(ConfigError::InvalidValue(
874                "underutilized_threshold must be in [0.0, 1.0]".into(),
875            ));
876        }
877        if self.min_shards == 0 {
878            return Err(ConfigError::InvalidValue("min_shards must be > 0".into()));
879        }
880        if self.max_shards < self.min_shards {
881            return Err(ConfigError::InvalidValue(
882                "max_shards must be >= min_shards".into(),
883            ));
884        }
885        // Zero durations on the scaling path either div-by-zero
886        // (`metrics_window`), thrash the scaler (`cooldown`), or scale
887        // down on the first underutilized sample (`scale_down_delay`).
888        // Reject all three at config time.
889        if self.cooldown.is_zero() {
890            return Err(ConfigError::InvalidValue("cooldown must be > 0".into()));
891        }
892        if self.metrics_window.is_zero() {
893            return Err(ConfigError::InvalidValue(
894                "metrics_window must be > 0".into(),
895            ));
896        }
897        if self.scale_down_delay.is_zero() {
898            return Err(ConfigError::InvalidValue(
899                "scale_down_delay must be > 0".into(),
900            ));
901        }
902        Ok(())
903    }
904}
905
906/// Get the number of CPU cores (fallback to 1).
907fn num_cpus() -> u16 {
908    std::thread::available_parallelism()
909        .map(|n| u16::try_from(n.get()).unwrap_or(u16::MAX))
910        .unwrap_or(1)
911}
912
913#[cfg(test)]
914mod tests {
915    use super::*;
916
917    #[test]
918    fn test_default_config() {
919        let config = EventBusConfig::default();
920        assert!(config.validate().is_ok());
921        assert!(config.num_shards > 0);
922        assert!(config.ring_buffer_capacity.is_power_of_two());
923    }
924
925    #[test]
926    fn test_builder() {
927        let config = EventBusConfig::builder()
928            .num_shards(8)
929            .ring_buffer_capacity(1 << 16)
930            .backpressure_mode(BackpressureMode::FailProducer)
931            .build()
932            .unwrap();
933
934        assert_eq!(config.num_shards, 8);
935        assert_eq!(config.ring_buffer_capacity, 65536);
936        assert_eq!(config.backpressure_mode, BackpressureMode::FailProducer);
937    }
938
939    #[test]
940    fn test_invalid_ring_buffer_capacity() {
941        let result = EventBusConfig::builder()
942            .ring_buffer_capacity(1000) // Not a power of 2
943            .build();
944
945        assert!(result.is_err());
946    }
947
948    #[test]
949    fn test_batch_config_presets() {
950        let high = BatchConfig::high_throughput();
951        assert!(high.validate().is_ok());
952        assert!(high.max_size > high.min_size);
953
954        let low = BatchConfig::low_latency();
955        assert!(low.validate().is_ok());
956        assert!(low.max_delay < high.max_delay);
957    }
958
959    #[test]
960    fn test_scaling_enabled_by_default() {
961        let config = EventBusConfig::default();
962        assert!(config.scaling.is_some());
963
964        let policy = config.scaling.unwrap();
965        assert_eq!(policy.max_shards, config.num_shards);
966        assert!(policy.auto_scale);
967    }
968
969    #[test]
970    fn test_builder_enables_scaling_by_default() {
971        let config = EventBusConfig::builder().num_shards(8).build().unwrap();
972
973        assert!(config.scaling.is_some());
974        let policy = config.scaling.unwrap();
975        assert_eq!(policy.max_shards, 8);
976    }
977
978    #[test]
979    fn test_builder_without_scaling() {
980        let config = EventBusConfig::builder()
981            .num_shards(4)
982            .without_scaling()
983            .build()
984            .unwrap();
985
986        assert!(config.scaling.is_none());
987    }
988
989    #[test]
990    fn test_with_dynamic_scaling_respects_num_shards() {
991        // with_dynamic_scaling() should use num_shards for max_shards, not CPU count
992        let config = EventBusConfig::builder()
993            .num_shards(8)
994            .with_dynamic_scaling()
995            .build()
996            .unwrap();
997
998        assert!(config.scaling.is_some());
999        let policy = config.scaling.unwrap();
1000        assert_eq!(policy.max_shards, 8);
1001
1002        // Order shouldn't matter
1003        let config2 = EventBusConfig::builder()
1004            .with_dynamic_scaling()
1005            .num_shards(16)
1006            .build()
1007            .unwrap();
1008
1009        assert!(config2.scaling.is_some());
1010        let policy2 = config2.scaling.unwrap();
1011        assert_eq!(policy2.max_shards, 16);
1012    }
1013
1014    #[test]
1015    fn test_scaling_policy_presets() {
1016        let high = ScalingPolicy::high_throughput();
1017        assert!(high.validate().is_ok());
1018        assert!(high.max_shards >= high.min_shards);
1019
1020        let conservative = ScalingPolicy::conservative();
1021        assert!(conservative.validate().is_ok());
1022        assert!(conservative.cooldown > high.cooldown);
1023    }
1024
1025    #[test]
1026    fn test_scaling_policy_validation() {
1027        let mut policy = ScalingPolicy {
1028            underutilized_threshold: 0.0,
1029            ..Default::default()
1030        };
1031
1032        // Valid underutilized_threshold
1033        assert!(policy.validate().is_ok());
1034        policy.underutilized_threshold = 0.5;
1035        assert!(policy.validate().is_ok());
1036        policy.underutilized_threshold = 1.0;
1037        assert!(policy.validate().is_ok());
1038
1039        // Invalid underutilized_threshold
1040        policy.underutilized_threshold = -0.1;
1041        assert!(policy.validate().is_err());
1042        policy.underutilized_threshold = 1.1;
1043        assert!(policy.validate().is_err());
1044
1045        // Reset and test fill_ratio_threshold
1046        policy.underutilized_threshold = 0.1;
1047        policy.fill_ratio_threshold = 0.0;
1048        assert!(policy.validate().is_err());
1049        policy.fill_ratio_threshold = 1.1;
1050        assert!(policy.validate().is_err());
1051    }
1052
1053    // ========================================================================
1054    // validate() must reject NaN / ±inf thresholds
1055    // ========================================================================
1056
1057    /// `validate()` rejects `f64::NaN` for both threshold fields.
1058    /// Pre-fix the raw `<=` / `>` range checks accepted NaN
1059    /// because every comparison with NaN returns `false`; the
1060    /// "validated" config then sat inert at runtime since the
1061    /// scaler's `m.fill_ratio > policy.fill_ratio_threshold` was
1062    /// always false against NaN.
1063    #[test]
1064    fn validate_rejects_nan_fill_ratio_threshold() {
1065        let policy = ScalingPolicy {
1066            fill_ratio_threshold: f64::NAN,
1067            ..Default::default()
1068        };
1069        assert!(
1070            policy.validate().is_err(),
1071            "NaN fill_ratio_threshold must be rejected",
1072        );
1073    }
1074
1075    #[test]
1076    fn validate_rejects_nan_underutilized_threshold() {
1077        let policy = ScalingPolicy {
1078            underutilized_threshold: f64::NAN,
1079            ..Default::default()
1080        };
1081        assert!(
1082            policy.validate().is_err(),
1083            "NaN underutilized_threshold must be rejected",
1084        );
1085    }
1086
1087    /// `validate()` also rejects `±inf` for both threshold fields.
1088    /// A config that arithmetic'd to infinity (e.g. divide-by-tiny)
1089    /// would have slipped through the `> 1.0` check on positive
1090    /// infinity (which IS rejected) but not on a negative infinity
1091    /// against the lower bound for `fill_ratio_threshold` —
1092    /// `-inf <= 0.0` is true, so it would have been rejected
1093    /// already; for `underutilized_threshold` the lower bound
1094    /// `-inf < 0.0` is also true. The explicit `is_finite()` guard
1095    /// pins these edge cases regardless of which bound check would
1096    /// have fired.
1097    #[test]
1098    fn validate_rejects_infinity_thresholds() {
1099        let p1 = ScalingPolicy {
1100            fill_ratio_threshold: f64::INFINITY,
1101            ..Default::default()
1102        };
1103        assert!(p1.validate().is_err());
1104
1105        let p2 = ScalingPolicy {
1106            fill_ratio_threshold: f64::NEG_INFINITY,
1107            ..Default::default()
1108        };
1109        assert!(p2.validate().is_err());
1110
1111        let p3 = ScalingPolicy {
1112            underutilized_threshold: f64::INFINITY,
1113            ..Default::default()
1114        };
1115        assert!(p3.validate().is_err());
1116
1117        let p4 = ScalingPolicy {
1118            underutilized_threshold: f64::NEG_INFINITY,
1119            ..Default::default()
1120        };
1121        assert!(p4.validate().is_err());
1122    }
1123
1124    #[test]
1125    fn test_config_validates_scaling_policy() {
1126        // Invalid scaling policy should cause config build to fail
1127        let invalid_policy = ScalingPolicy {
1128            min_shards: 10,
1129            max_shards: 5, // Invalid: min > max
1130            ..Default::default()
1131        };
1132
1133        let result = EventBusConfig::builder()
1134            .num_shards(4)
1135            .scaling(invalid_policy)
1136            .build();
1137
1138        assert!(result.is_err());
1139
1140        // Another invalid policy
1141        let invalid_policy2 = ScalingPolicy {
1142            fill_ratio_threshold: 1.5, // Invalid: > 1.0
1143            ..Default::default()
1144        };
1145
1146        let result2 = EventBusConfig::builder()
1147            .num_shards(4)
1148            .scaling(invalid_policy2)
1149            .build();
1150
1151        assert!(result2.is_err());
1152    }
1153
1154    // Regression: high_throughput() used cpus * 2 which overflows u16
1155    // on machines with >32K CPUs (BUGS_3 #7).
1156    #[test]
1157    fn test_high_throughput_max_shards_no_overflow() {
1158        let policy = ScalingPolicy::high_throughput();
1159        assert!(policy.max_shards >= policy.min_shards);
1160        assert!(policy.validate().is_ok());
1161    }
1162
1163    /// Regression: BUG_REPORT.md #3 — zero-rate `Sample` previously
1164    /// passed validation but div-by-zero'd downstream.
1165    #[test]
1166    fn test_validate_rejects_sample_rate_zero() {
1167        let result = EventBusConfig::builder()
1168            .backpressure_mode(BackpressureMode::Sample { rate: 0 })
1169            .build();
1170        assert!(
1171            result.is_err(),
1172            "BackpressureMode::Sample.rate == 0 must reject"
1173        );
1174    }
1175
1176    /// Regression: `BatchConfig::validate` must reject an
1177    /// unbounded `max_size`. The adaptive-batching code in
1178    /// `shard/batch.rs` does arithmetic like
1179    /// `current_batch_size * 3 + target` against `max_size`-clamped
1180    /// values; with `max_size = usize::MAX` the multiplication
1181    /// panics in debug builds and wraps in release. The default
1182    /// `max_size = 10_000` is safe; only a hostile config can
1183    /// trip the overflow. Cap at `MAX_BATCH_SIZE_LIMIT` so the
1184    /// arithmetic stays well below the blast radius.
1185    #[test]
1186    fn batch_config_rejects_max_size_above_limit() {
1187        // Boundary at the limit is OK.
1188        let at_limit = BatchConfig {
1189            max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT,
1190            ..Default::default()
1191        };
1192        assert!(
1193            at_limit.validate().is_ok(),
1194            "max_size at MAX_BATCH_SIZE_LIMIT must be valid"
1195        );
1196
1197        // Just past the limit must reject.
1198        let above = BatchConfig {
1199            max_size: BatchConfig::MAX_BATCH_SIZE_LIMIT + 1,
1200            ..Default::default()
1201        };
1202        assert!(
1203            above.validate().is_err(),
1204            "max_size > MAX_BATCH_SIZE_LIMIT must reject — adaptive \
1205             arithmetic overflows past this cap"
1206        );
1207
1208        // The pathological case the audit's example targets.
1209        let hostile = BatchConfig {
1210            max_size: usize::MAX,
1211            ..Default::default()
1212        };
1213        assert!(
1214            hostile.validate().is_err(),
1215            "max_size = usize::MAX must reject (regression: \
1216             current_batch_size * 3 + target overflow)"
1217        );
1218    }
1219
1220    /// Regression: BUG_REPORT.md #3 — zero `velocity_window` with
1221    /// adaptive batching div-by-zero'd the throughput calculator.
1222    #[test]
1223    fn test_validate_rejects_zero_velocity_window_when_adaptive() {
1224        let bad = BatchConfig {
1225            adaptive: true,
1226            velocity_window: Duration::ZERO,
1227            ..Default::default()
1228        };
1229        assert!(bad.validate().is_err());
1230
1231        // Non-adaptive ignores the field.
1232        let ok = BatchConfig {
1233            adaptive: false,
1234            velocity_window: Duration::ZERO,
1235            ..Default::default()
1236        };
1237        assert!(ok.validate().is_ok());
1238    }
1239
1240    /// Regression: BUG_REPORT.md #3 — zero `adapter_timeout` made
1241    /// every adapter call time out instantly.
1242    #[test]
1243    fn test_validate_rejects_zero_adapter_timeout() {
1244        let config = EventBusConfig {
1245            adapter_timeout: Duration::ZERO,
1246            ..EventBusConfig::default()
1247        };
1248        assert!(config.validate().is_err());
1249    }
1250
1251    /// Regression: BUG_REPORT.md #3 — `ScalingPolicy` durations of
1252    /// zero either div-by-zero'd, thrashed the scaler, or scaled
1253    /// down on the first underutilized sample.
1254    #[test]
1255    fn test_validate_rejects_zero_scaling_durations() {
1256        let base = ScalingPolicy::default();
1257
1258        let mut p = base.clone();
1259        p.cooldown = Duration::ZERO;
1260        assert!(p.validate().is_err());
1261
1262        let mut p = base.clone();
1263        p.metrics_window = Duration::ZERO;
1264        assert!(p.validate().is_err());
1265
1266        let mut p = base;
1267        p.scale_down_delay = Duration::ZERO;
1268        assert!(p.validate().is_err());
1269    }
1270
1271    /// Regression: BUG_REPORT.md #3 — `RedisAdapterConfig` had no
1272    /// `validate()` and `pipeline_size: 0` shipped through to a
1273    /// runtime panic.
1274    #[cfg(feature = "redis")]
1275    #[test]
1276    fn test_validate_redis_pipeline_size_zero_rejected() {
1277        let mut redis = RedisAdapterConfig::new("redis://localhost:6379");
1278        redis.pipeline_size = 0;
1279
1280        let result = EventBusConfig::builder()
1281            .adapter(AdapterConfig::Redis(redis))
1282            .build();
1283        assert!(result.is_err(), "redis pipeline_size == 0 must reject");
1284    }
1285
1286    /// Regression: BUG_REPORT.md #3 — `JetStreamAdapterConfig` had
1287    /// no `validate()` either.
1288    #[cfg(feature = "jetstream")]
1289    #[test]
1290    fn test_validate_jetstream_replicas_zero_rejected() {
1291        let mut js = JetStreamAdapterConfig::new("nats://localhost:4222");
1292        js.replicas = 0;
1293
1294        let result = EventBusConfig::builder()
1295            .adapter(AdapterConfig::JetStream(js))
1296            .build();
1297        assert!(result.is_err(), "jetstream replicas == 0 must reject");
1298    }
1299
1300    /// `JetStreamAdapterConfig::validate` rejects negative
1301    /// `max_messages` / `max_bytes`. NATS rejects negatives at
1302    /// stream-create time, so without validate-time enforcement the
1303    /// misconfig surfaces as a runtime adapter error minutes later.
1304    #[cfg(feature = "jetstream")]
1305    #[test]
1306    fn validate_rejects_negative_max_messages() {
1307        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(-1);
1308        let err = js
1309            .validate()
1310            .expect_err("negative max_messages must reject");
1311        let msg = format!("{err}");
1312        assert!(
1313            msg.contains("max_messages"),
1314            "error must mention the field, got: {msg}"
1315        );
1316    }
1317
1318    #[cfg(feature = "jetstream")]
1319    #[test]
1320    fn validate_rejects_negative_max_bytes() {
1321        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_bytes(-100);
1322        let err = js.validate().expect_err("negative max_bytes must reject");
1323        let msg = format!("{err}");
1324        assert!(
1325            msg.contains("max_bytes"),
1326            "error must mention the field, got: {msg}"
1327        );
1328    }
1329
1330    #[cfg(feature = "jetstream")]
1331    #[test]
1332    fn validate_accepts_zero_and_positive_max_messages() {
1333        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(0);
1334        assert!(js.validate().is_ok(), "zero must be accepted");
1335        let js = JetStreamAdapterConfig::new("nats://localhost:4222").with_max_messages(1_000_000);
1336        assert!(js.validate().is_ok(), "positive must be accepted");
1337    }
1338}