Skip to main content

net/shard/
mapper.rs

1//! Dynamic shard mapping and scaling.
2//!
3//! This module implements dynamic shard scaling following Approach B from
4//! DYNAMIC_SHARD_SCALING.md: increasing the number of shards and rebalancing
5//! producers across them, while maintaining SPSC (single-producer single-consumer)
6//! semantics per shard.
7//!
8//! # Core Principles
9//!
10//! - Shards never accept multiple producers
11//! - Producers get moved to new shards when load increases
12//! - Ingestion performance stays at 700M ops/sec per shard
13//! - Ordering guarantees remain intact within a shard
14//! - Total throughput scales linearly with shard count
15//!
16//! # Scaling Triggers
17//!
18//! - Ring buffer fill ratio > threshold (default 70%)
19//! - Push latency exceeds threshold (default 5ns)
20//! - Batch flush latency exceeds threshold
21//! - Session/producer count growth
22//!
23//! # Architecture
24//!
25//! ```text
26//! Producers -----+
27//!                |
28//!                v
29//!     +----------------------------+
30//!     |   Dynamic Shard Mapper     |
31//!     +----------------------------+
32//!        |         |         |
33//!        v         v         v
34//!     Shard 0   Shard 1   Shard 2 …
35//! ```
36
37use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering as AtomicOrdering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use arc_swap::ArcSwap;
42use parking_lot::RwLock;
43
44use crate::config::ScalingPolicy;
45
46/// Metrics for a single shard used for scaling decisions.
47#[derive(Debug, Clone)]
48pub struct ShardMetrics {
49    /// Shard identifier.
50    pub shard_id: u16,
51    /// Current fill ratio (0.0 - 1.0).
52    pub fill_ratio: f64,
53    /// Events ingested in the current window.
54    pub event_rate: u64,
55    /// Average push latency in nanoseconds.
56    pub avg_push_latency_ns: u64,
57    /// Average batch flush latency in microseconds.
58    pub avg_flush_latency_us: u64,
59    /// Whether this shard is in drain mode.
60    pub draining: bool,
61    /// Computed weight for load balancing (lower = less loaded).
62    pub weight: f64,
63    /// Last update timestamp.
64    pub last_updated: Instant,
65}
66
67impl ShardMetrics {
68    /// Create new metrics for a shard.
69    pub fn new(shard_id: u16) -> Self {
70        Self {
71            shard_id,
72            fill_ratio: 0.0,
73            event_rate: 0,
74            avg_push_latency_ns: 0,
75            avg_flush_latency_us: 0,
76            draining: false,
77            weight: 0.0,
78            last_updated: Instant::now(),
79        }
80    }
81
82    /// Compute the weight based on current metrics.
83    /// Lower weight = better candidate for new producers.
84    pub fn compute_weight(&mut self) {
85        // Weight formula: combines fill ratio, latency, and event rate
86        // Higher fill ratio = higher weight (avoid overloaded shards)
87        // Higher latency = higher weight
88        // Higher event rate = higher weight
89        let fill_weight = self.fill_ratio * 100.0;
90        let latency_weight = (self.avg_push_latency_ns as f64) / 10.0;
91        let rate_weight = (self.event_rate as f64) / 1_000_000.0;
92
93        self.weight = fill_weight + latency_weight + rate_weight;
94
95        // Draining shards get maximum weight (never assign new producers)
96        if self.draining {
97            self.weight = f64::MAX;
98        }
99    }
100}
101
102/// Live metrics collector for a shard (atomics for lock-free updates).
103#[derive(Debug)]
104pub struct ShardMetricsCollector {
105    /// Shard identifier.
106    shard_id: u16,
107    /// Ring buffer capacity.
108    capacity: usize,
109    /// Current buffer length (updated by shard).
110    current_len: AtomicU64,
111    /// Events ingested in current window.
112    events_in_window: AtomicU64,
113    /// Packed `(count << 32) | sum` for push latencies (ns).
114    /// Pre-fix `push_latency_sum_ns` and `push_count` were
115    /// independent `AtomicU64`s. `record_push` did two separate
116    /// `fetch_add`s, and `collect_and_reset` did two separate
117    /// `swap`s. A metrics tick interleaving between the two
118    /// `fetch_add`s captured the sum WITHOUT the count (or
119    /// vice versa); the resulting `avg = sum.checked_div(count)
120    /// .unwrap_or(0)` returned 0 in window N (sum without
121    /// count) and 0 in window N+1 (count without sum), silently
122    /// zeroing the average that drives `evaluate_scaling`'s
123    /// push-latency scale-up trigger. Packing into one u64 makes
124    /// the `(sum, count)` update atomic; the upper 32 bits hold
125    /// the count (u32::MAX = 4G calls/window — plenty) and the
126    /// lower 32 hold the sum (u32::MAX = 4 G ns ≈ 4 s, also
127    /// plenty for any sane window).
128    push_latency: AtomicU64,
129    /// Packed `(count << 32) | sum` for flush latencies (us).
130    /// Same shape and rationale as `push_latency`. The lower 32
131    /// bits hold sum-µs (u32::MAX ≈ 4 Gµs ≈ 67 minutes — far
132    /// past any plausible window).
133    flush_latency: AtomicU64,
134    /// Whether this shard is draining.
135    draining: AtomicBool,
136    /// Window start time.
137    window_start: RwLock<Instant>,
138    /// Pushes observed since `set_draining(true)` was last called.
139    /// Distinct from `events_in_window` because this counter is NOT
140    /// reset by `collect_and_reset`. `finalize_draining` reads this
141    /// instead of `events_in_window` so a drain-window-overlap with
142    /// a metrics tick can no longer race the counter to zero before
143    /// the producer is observed.
144    pushes_since_drain_start: AtomicU64,
145}
146
147impl ShardMetricsCollector {
148    /// Create a new metrics collector.
149    pub fn new(shard_id: u16, capacity: usize) -> Self {
150        Self {
151            shard_id,
152            capacity,
153            current_len: AtomicU64::new(0),
154            events_in_window: AtomicU64::new(0),
155            push_latency: AtomicU64::new(0),
156            flush_latency: AtomicU64::new(0),
157            draining: AtomicBool::new(false),
158            window_start: RwLock::new(Instant::now()),
159            pushes_since_drain_start: AtomicU64::new(0),
160        }
161    }
162
163    /// Record current buffer length.
164    #[inline]
165    pub fn record_buffer_len(&self, len: usize) {
166        self.current_len.store(len as u64, AtomicOrdering::Relaxed);
167    }
168
169    /// Record an event ingestion: bumps the per-event counters
170    /// (events_in_window, pushes_since_drain_start) AND adds a
171    /// latency sample to the packed `(count, sum)` average.
172    ///
173    /// Pre-PERF_AUDIT §1.3 this was the only API and every
174    /// successful push called it, paying the CAS loop on
175    /// `push_latency` per event under the shard mutex. The
176    /// subsampled fast path now calls [`Self::record_event_only`]
177    /// per event and [`Self::record_latency_sample`] only once
178    /// every N pushes. This composite remains for test fixtures
179    /// and the rare caller that wants both updates together.
180    #[inline]
181    pub fn record_push(&self, latency_ns: u64) {
182        self.record_event_only();
183        self.record_latency_sample(latency_ns);
184    }
185
186    /// Bump only the per-event counters (no latency sample).
187    /// Always called by the push path so per-event statistics
188    /// (event_rate, drain-window counts) stay accurate at full
189    /// resolution; the more expensive latency CAS-loop runs only
190    /// at the subsampled cadence (PERF_AUDIT §1.3).
191    #[inline]
192    pub fn record_event_only(&self) {
193        self.events_in_window.fetch_add(1, AtomicOrdering::Relaxed);
194        // Always increment — the cost is one fetch_add and the
195        // counter only matters when the shard is draining. Cheaper
196        // than branching on `self.draining.load()` in the hot path.
197        self.pushes_since_drain_start
198            .fetch_add(1, AtomicOrdering::Relaxed);
199    }
200
201    /// Add a latency sample to the packed `(count, sum)` average.
202    /// Called once per sampling boundary (default 1-in-64 pushes)
203    /// to keep the average estimator at full statistical fidelity
204    /// without paying the CAS loop on every event (PERF_AUDIT §1.3).
205    ///
206    /// Subsampling is an unbiased estimator: `sum / count` over
207    /// random samples converges to the true mean, so the
208    /// scaling heuristic's threshold comparison stays correct;
209    /// only the variance grows. With N=64, the standard error
210    /// shrinks below 1 ns at typical event rates (>1k/sec) within
211    /// a single tick window.
212    #[inline]
213    pub fn record_latency_sample(&self, latency_ns: u64) {
214        // Atomically add 1 to count (upper 32 bits) and
215        // `latency_ns` to sum (lower 32 bits). `fetch_update`
216        // CAS-loops the load-and-store, so a concurrent
217        // `collect_and_reset` swap on the same word either sees
218        // both pre-add or both post-add — no `(sum, count)`
219        // desync. Saturating ops cap at u32::MAX inside the
220        // pack window (~4 G calls / 4 s of accumulated latency),
221        // which is far beyond any sane metrics tick.
222        let _ =
223            self.push_latency
224                .fetch_update(AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |v| {
225                    let count = (v >> 32) as u32;
226                    let sum = (v & 0xFFFF_FFFF) as u32;
227                    let new_count = count.saturating_add(1) as u64;
228                    let new_sum = sum.saturating_add(latency_ns.min(u32::MAX as u64) as u32) as u64;
229                    Some((new_count << 32) | new_sum)
230                });
231    }
232
233    /// Record a batch flush.
234    #[inline]
235    pub fn record_flush(&self, latency_us: u64) {
236        // Same packed-`(count, sum)` shape as `record_push` —
237        // see that function for the desync rationale.
238        let _ = self.flush_latency.fetch_update(
239            AtomicOrdering::Relaxed,
240            AtomicOrdering::Relaxed,
241            |v| {
242                let count = (v >> 32) as u32;
243                let sum = (v & 0xFFFF_FFFF) as u32;
244                let new_count = count.saturating_add(1) as u64;
245                let new_sum = sum.saturating_add(latency_us.min(u32::MAX as u64) as u32) as u64;
246                Some((new_count << 32) | new_sum)
247            },
248        );
249    }
250
251    /// Set drain mode.
252    ///
253    /// Transitions to draining reset `pushes_since_drain_start` so
254    /// `finalize_draining` only counts pushes that arrived after the
255    /// drain began.
256    ///
257    /// A concurrent `record_push` can interleave with the store-zero
258    /// on `pushes_since_drain_start` and leave the counter at `1`
259    /// rather than `0`. That's not a correctness bug — it just defers
260    /// finalization of this shard by one metrics tick — but the
261    /// previous code did the store-zero on the counter *before*
262    /// publishing the `draining=true` flag, which made the window
263    /// slightly larger than necessary. Publishing the flag first
264    /// means any push that *observes* `draining=true` is naturally
265    /// sequenced after the reset; pushes that beat the flag publish
266    /// race the reset just like before.
267    ///
268    /// We also use `SeqCst` for the publish to give the rest of the
269    /// crate a single total order on draining transitions, which
270    /// matches the ordering on `try_enter_ingest`'s shutdown flag.
271    pub fn set_draining(&self, draining: bool) {
272        if draining {
273            // Store-zero first (so the flag publish below acts as
274            // the release-fence for both writes).
275            self.pushes_since_drain_start
276                .store(0, AtomicOrdering::SeqCst);
277        }
278        self.draining.store(draining, AtomicOrdering::SeqCst);
279    }
280
281    /// Number of pushes observed since `set_draining(true)` was
282    /// last called. Used by `finalize_draining` to detect lingering
283    /// producers that the window-reset `events_in_window` counter
284    /// can race past.
285    ///
286    /// Pre-fix used `Ordering::Relaxed`, but the writer
287    /// side (`set_draining(true)`) resets the counter under
288    /// `SeqCst`. On weakly-ordered hardware (ARM), a Relaxed
289    /// reader could observe a stale counter and `finalize_draining`
290    /// would falsely conclude the drain had flushed while
291    /// producers were still pushing. Acquire pairs with the
292    /// SeqCst release of the reset (SeqCst includes Release
293    /// semantics), making the reset happen-before this load.
294    pub fn pushes_since_drain_start(&self) -> u64 {
295        self.pushes_since_drain_start.load(AtomicOrdering::Acquire)
296    }
297
298    /// Check if draining.
299    pub fn is_draining(&self) -> bool {
300        self.draining.load(AtomicOrdering::Acquire)
301    }
302
303    /// Collect metrics and reset window counters.
304    ///
305    /// NOTE: The individual atomic swaps below are not collectively atomic with
306    /// respect to concurrent `record_push`/`record_flush` calls. This means a
307    /// push recorded between, say, the `events_in_window` swap and the
308    /// `push_count` swap could be counted in one counter but not the other for
309    /// a given window. This small inaccuracy is an accepted trade-off to
310    /// preserve the lock-free design of the hot path (`record_push` /
311    /// `record_flush`). Adding a `Mutex` here would serialize the hot path
312    /// and defeat the purpose of using atomics.
313    pub fn collect_and_reset(&self) -> ShardMetrics {
314        let current_len = self.current_len.load(AtomicOrdering::Relaxed);
315        let events = self.events_in_window.swap(0, AtomicOrdering::Relaxed);
316        // Single swap captures `(count, sum)` together; no
317        // chance of catching the sum without the matching count
318        // (or vice versa) the way two independent swaps did.
319        let push_packed = self.push_latency.swap(0, AtomicOrdering::Relaxed);
320        let push_count = push_packed >> 32;
321        let push_latency_sum = push_packed & 0xFFFF_FFFF;
322        let flush_packed = self.flush_latency.swap(0, AtomicOrdering::Relaxed);
323        let flush_count = flush_packed >> 32;
324        let flush_latency_sum = flush_packed & 0xFFFF_FFFF;
325
326        let fill_ratio = if self.capacity > 0 {
327            current_len as f64 / self.capacity as f64
328        } else {
329            0.0
330        };
331
332        let avg_push_latency = push_latency_sum.checked_div(push_count).unwrap_or(0);
333
334        let avg_flush_latency = flush_latency_sum.checked_div(flush_count).unwrap_or(0);
335
336        // Reset window
337        *self.window_start.write() = Instant::now();
338
339        let mut metrics = ShardMetrics {
340            shard_id: self.shard_id,
341            fill_ratio,
342            event_rate: events,
343            avg_push_latency_ns: avg_push_latency,
344            avg_flush_latency_us: avg_flush_latency,
345            draining: self.draining.load(AtomicOrdering::Acquire),
346            weight: 0.0,
347            last_updated: Instant::now(),
348        };
349        metrics.compute_weight();
350        metrics
351    }
352}
353
354/// Scaling decision made by the mapper.
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub enum ScalingDecision {
357    /// No scaling needed.
358    None,
359    /// Scale up by adding N shards.
360    ScaleUp(u16),
361    /// Scale down by removing N shards (marks them for draining).
362    ScaleDown(u16),
363}
364
365/// Errors that can occur during scaling operations.
366#[derive(Debug, Clone)]
367pub enum ScalingError {
368    /// Invalid scaling policy.
369    InvalidPolicy(String),
370    /// Already at maximum shards.
371    AtMaxShards,
372    /// Already at minimum shards.
373    AtMinShards,
374    /// Scaling operation in cooldown.
375    InCooldown,
376    /// Shard creation failed.
377    ShardCreationFailed(String),
378}
379
380impl std::fmt::Display for ScalingError {
381    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382        match self {
383            Self::InvalidPolicy(msg) => write!(f, "invalid scaling policy: {}", msg),
384            Self::AtMaxShards => write!(f, "already at maximum shard count"),
385            Self::AtMinShards => write!(f, "already at minimum shard count"),
386            Self::InCooldown => write!(f, "scaling operation in cooldown"),
387            Self::ShardCreationFailed(msg) => write!(f, "shard creation failed: {}", msg),
388        }
389    }
390}
391
392impl std::error::Error for ScalingError {}
393
394/// State of a shard in the mapper.
395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
396pub enum ShardState {
397    /// Shard is being provisioned: id allocated and metrics collector
398    /// in place, but `select_shard` must not route to it yet because
399    /// upstream workers (drain / batch) have not been spawned. Caller
400    /// transitions to `Active` via `activate` once the workers are
401    /// ready. Closes the race where a freshly-added shard accepted
402    /// producer pushes before any consumer existed.
403    Provisioning,
404    /// Shard is active and accepting producers.
405    Active,
406    /// Shard is draining (no new producers, waiting for empty).
407    Draining,
408    /// Shard is stopped and can be removed.
409    Stopped,
410}
411
412/// Information about a shard managed by the mapper.
413#[derive(Debug)]
414struct MappedShard {
415    /// Shard ID.
416    id: u16,
417    /// Current state.
418    state: ShardState,
419    /// Metrics collector.
420    metrics: Arc<ShardMetricsCollector>,
421    /// When this shard entered drain mode (if draining).
422    drain_started: Option<Instant>,
423    /// Last collected metrics.
424    last_metrics: ShardMetrics,
425    /// When this shard last transitioned to `Active`. Used by
426    /// `evaluate_scaling` to skip recently-activated shards from
427    /// scale decisions: their `last_metrics` is the
428    /// `ShardMetrics::new(id)` placeholder until at least one
429    /// `collect_metrics` cycle has run, and the placeholder
430    /// (`fill_ratio = 0.0, event_rate = 0`) trips the
431    /// underutilized trigger immediately — oscillating the system
432    /// (scale-up → next tick scale-down → next tick scale-up …)
433    /// when a fresh shard is added but hasn't yet absorbed any
434    /// traffic.
435    activated_at: Instant,
436}
437
438/// Callback type for shard lifecycle events.
439type ShardCallback = Box<dyn Fn(u16) + Send + Sync>;
440
441/// Pre-computed slice of shard ids that `select_shard` may return.
442///
443/// Holds the subset of currently-Active shards whose `last_metrics.weight`
444/// is within tolerance of the minimum. Empty when no active shards exist
445/// (in which case `select_shard` returns `u16::MAX`).
446///
447/// The table is rebuilt by [`ShardMapper::rebuild_selection_table_locked`]
448/// at every mutation that could change which shards are routable (state
449/// transitions, metric/weight refresh, scale-up/down). Hot-path readers
450/// (`select_shard`) snapshot it via `ArcSwap::load` — zero alloc, zero
451/// lock per call.
452struct SelectionTable {
453    /// Shard ids in priority order (within-tolerance of min weight).
454    /// `Box<[u16]>` rather than `Vec<u16>` because the slice is immutable
455    /// after construction; saves one word per snapshot.
456    candidates: Box<[u16]>,
457}
458
459/// Dynamic shard mapper that manages shard allocation and producer routing.
460///
461/// This is the core component for dynamic scaling. It:
462/// - Tracks metrics for all shards
463/// - Makes scaling decisions based on policy
464/// - Routes producers to the least-loaded shards
465/// - Manages shard lifecycle (active → draining → stopped)
466pub struct ShardMapper {
467    /// Mapped shards (RwLock for concurrent reads, rare writes).
468    shards: RwLock<Vec<MappedShard>>,
469    /// Pre-computed routable-shard snapshot. Updated whenever the shard
470    /// list or weights change; read lock-free by [`Self::select_shard`].
471    ///
472    /// Pre-fix [perf #2 in `docs/performance/net-perf-analysis.md`],
473    /// `select_shard` did this work on every event: two `Vec` allocs +
474    /// an `RwLock::read` + min-weight scan + tolerance filter. At 10M
475    /// ev/s that's 20M allocs/sec plus a parking_lot acquire each time.
476    /// The table is rebuilt at most once per metrics tick and once per
477    /// state transition.
478    selection_table: ArcSwap<SelectionTable>,
479    /// Current active shard count.
480    active_count: AtomicU16,
481    /// Scaling policy.
482    ///
483    /// This field is immutable for the lifetime of the mapper. The
484    /// previous `set_policy(&mut self, …)` API was unreachable in
485    /// practice — every production callsite holds the mapper behind
486    /// an `Arc`, and `Arc::get_mut` requires a strong count of 1,
487    /// which never holds once the worker pool clones the `Arc`. The
488    /// method has been removed; recreate the mapper (and
489    /// the bus that owns it) to change the policy.
490    policy: ScalingPolicy,
491    /// Ring buffer capacity for new shards.
492    ring_buffer_capacity: usize,
493    /// Last scaling operation timestamp.
494    ///
495    /// This RwLock is **logically** scoped to the
496    /// outer `shards.write()` lock — `scale_up`, `scale_down`,
497    /// and `scale_up_provisioning` all read this field and write
498    /// to it while holding `shards.write()`. The cooldown gate
499    /// is therefore atomic with the scale mutation: no caller
500    /// can pass the cooldown check, observe stale `last_scaling`,
501    /// and have its mutation interleave with another caller.
502    ///
503    /// **If you narrow `shards.write()`'s scope in any of those
504    /// callers, this implicit serialization breaks.** Either:
505    ///   - Keep the cooldown read+write inside the outer lock, OR
506    ///   - Use a `compare_exchange`-style update on a single
507    ///     `AtomicI64` of nanos so the gate is atomic on its own.
508    ///
509    /// The doc-comment is here so a future refactorer doesn't
510    /// silently break the contract.
511    last_scaling: RwLock<Option<Instant>>,
512    /// Callback for shard creation (provided by ShardManager).
513    on_shard_created: RwLock<Option<ShardCallback>>,
514    /// Callback for shard removal (provided by ShardManager).
515    on_shard_removed: RwLock<Option<ShardCallback>>,
516    /// Monotonic shard-id allocator. The next `scale_up` call gets
517    /// `fetch_add(1)` from here. Distinct from `shards.iter().max() +
518    /// 1`: that approach reused ids whenever the highest-numbered
519    /// shard had been drained-and-removed, silently merging two
520    /// unrelated shard lifetimes in any external system that keys
521    /// metrics or checkpoints on shard id. Monotonic allocation
522    /// ensures every shard ever allocated has a globally unique id
523    /// for the lifetime of this mapper.
524    next_shard_id: AtomicU16,
525}
526
527impl ShardMapper {
528    /// Create a new shard mapper with the given initial shard count and policy.
529    pub fn new(
530        initial_shards: u16,
531        ring_buffer_capacity: usize,
532        policy: ScalingPolicy,
533    ) -> Result<Self, ScalingError> {
534        let mut policy = policy.normalize();
535        // Ensure max_shards can accommodate the initial shard count
536        if policy.max_shards < initial_shards {
537            policy.max_shards = initial_shards;
538        }
539        policy
540            .validate()
541            .map_err(|e| ScalingError::InvalidPolicy(e.to_string()))?;
542
543        // Initial shards: stamp `activated_at` far enough in the
544        // past that they're not subject to the warmup skip in
545        // `evaluate_scaling`. The boot-time shards have whatever
546        // baseline traffic the system serves; they shouldn't be
547        // exempted from scale decisions just because the mapper
548        // was just constructed.
549        let boot = Instant::now()
550            .checked_sub(std::time::Duration::from_secs(3600))
551            .unwrap_or_else(Instant::now);
552        let shards: Vec<MappedShard> = (0..initial_shards)
553            .map(|id| MappedShard {
554                id,
555                state: ShardState::Active,
556                metrics: Arc::new(ShardMetricsCollector::new(id, ring_buffer_capacity)),
557                drain_started: None,
558                last_metrics: ShardMetrics::new(id),
559                activated_at: boot,
560            })
561            .collect();
562
563        let mapper = Self {
564            shards: RwLock::new(shards),
565            selection_table: ArcSwap::from_pointee(SelectionTable {
566                candidates: Box::new([]),
567            }),
568            active_count: AtomicU16::new(initial_shards),
569            policy,
570            ring_buffer_capacity,
571            last_scaling: RwLock::new(None),
572            on_shard_created: RwLock::new(None),
573            on_shard_removed: RwLock::new(None),
574            // Initial shards occupy ids `[0, initial_shards)`, so the
575            // first scale-up takes `initial_shards`.
576            next_shard_id: AtomicU16::new(initial_shards),
577        };
578        // Seed the selection table with the boot shards.
579        mapper.rebuild_selection_table_locked(&mapper.shards.read());
580        Ok(mapper)
581    }
582
583    /// Rebuild the [`Self::selection_table`] from the current shards
584    /// slice. Must be called while holding the `shards` lock (read or
585    /// write) so the snapshot it captures is consistent with the
586    /// state visible to other callers.
587    ///
588    /// Filters to `Active` shards, finds the minimum weight, and
589    /// retains every active shard whose weight is within 0.1 of the
590    /// minimum. The tolerance pre-existed the perf fix (see the old
591    /// `select_shard` body) so the new table preserves the same
592    /// routing semantics.
593    ///
594    /// NaN-tolerance edge case: if every active shard has a NaN
595    /// weight the tolerance filter excludes all of them. To preserve
596    /// the pre-fix fallback (`active[0].id`), the table falls back
597    /// to *every* active id in that case.
598    fn rebuild_selection_table_locked(&self, shards: &[MappedShard]) {
599        let mut active_ids = Vec::with_capacity(shards.len());
600        for s in shards.iter() {
601            if s.state == ShardState::Active {
602                active_ids.push(s.id);
603            }
604        }
605        if active_ids.is_empty() {
606            self.selection_table.store(Arc::new(SelectionTable {
607                candidates: Box::new([]),
608            }));
609            return;
610        }
611        let min_weight = shards
612            .iter()
613            .filter(|s| s.state == ShardState::Active)
614            .map(|s| s.last_metrics.weight)
615            .fold(f64::MAX, f64::min);
616        let mut candidates: Vec<u16> = shards
617            .iter()
618            .filter(|s| {
619                s.state == ShardState::Active && (s.last_metrics.weight - min_weight).abs() < 0.1
620            })
621            .map(|s| s.id)
622            .collect();
623        if candidates.is_empty() {
624            // All-NaN fallback. Preserves the pre-fix `active[0].id`
625            // safety net; the random pick across all active shards
626            // is a strict superset of "first active."
627            candidates = active_ids;
628        }
629        self.selection_table.store(Arc::new(SelectionTable {
630            candidates: candidates.into_boxed_slice(),
631        }));
632    }
633
634    /// Set callback for shard creation.
635    pub fn set_on_shard_created<F>(&self, callback: F)
636    where
637        F: Fn(u16) + Send + Sync + 'static,
638    {
639        *self.on_shard_created.write() = Some(Box::new(callback));
640    }
641
642    /// Set callback for shard removal.
643    pub fn set_on_shard_removed<F>(&self, callback: F)
644    where
645        F: Fn(u16) + Send + Sync + 'static,
646    {
647        *self.on_shard_removed.write() = Some(Box::new(callback));
648    }
649
650    /// Get the metrics collector for a shard.
651    pub fn metrics_collector(&self, shard_id: u16) -> Option<Arc<ShardMetricsCollector>> {
652        let shards = self.shards.read();
653        shards
654            .iter()
655            .find(|s| s.id == shard_id)
656            .map(|s| s.metrics.clone())
657    }
658
659    /// Get current active shard count.
660    pub fn active_shard_count(&self) -> u16 {
661        self.active_count.load(AtomicOrdering::Acquire)
662    }
663
664    /// Get total shard count (including draining).
665    pub fn total_shard_count(&self) -> u16 {
666        self.shards.read().len() as u16
667    }
668
669    /// Select the best shard for a new event/producer.
670    ///
671    /// Pre-fix this acquired `shards.read()` and allocated two `Vec`s
672    /// per call (active filter, candidate filter) — at 10M ev/s that
673    /// was 20M allocs/sec plus a parking_lot acquire per event.
674    /// Post-fix the routable subset is pre-computed in the private
675    /// `selection_table` field and updated only at state
676    /// transitions / metric refreshes; the hot path is a single
677    /// `ArcSwap::load` + Lemire-bias-free integer mapping.
678    ///
679    /// Semantics preserved:
680    /// - Only considers active (non-draining) shards
681    /// - Prefers shards with lower weight (less loaded)
682    /// - Falls back to a random pick across all active shards if the
683    ///   tolerance filter excludes everything (NaN-weight edge case)
684    /// - Returns `u16::MAX` when no active shard exists, so callers
685    ///   that route via the routing table get a definite miss
686    #[inline]
687    pub fn select_shard(&self, event_hash: u64) -> u16 {
688        let table = self.selection_table.load();
689        if table.candidates.is_empty() {
690            // No active shards. Pre-fix returned `u16::MAX` from the
691            // same arm — callers that look up the id in the routing
692            // table get a definite miss, which the manager already
693            // accounts for.
694            return u16::MAX;
695        }
696        // Lemire's bias-free index mapping for any `len` that fits
697        // in u64. Pre-fix used `(event_hash as usize) %
698        // candidates.len()`, which biases low-bucket indices when
699        // `candidates.len()` is not a power of two
700        // (https://lemire.me/blog/2016/06/30/fast-random-shuffling/).
701        let idx = ((event_hash as u128 * table.candidates.len() as u128) >> 64) as usize;
702        table.candidates[idx]
703    }
704
705    /// Collect metrics from all shards and update weights.
706    pub fn collect_metrics(&self) -> Vec<ShardMetrics> {
707        let mut shards = self.shards.write();
708        let result: Vec<ShardMetrics> = shards
709            .iter_mut()
710            .map(|s| {
711                s.last_metrics = s.metrics.collect_and_reset();
712                s.last_metrics.clone()
713            })
714            .collect();
715        // Weights just changed — refresh the selection table so
716        // `select_shard` observes the new min-weight candidate set
717        // without re-scanning on every event.
718        self.rebuild_selection_table_locked(&shards);
719        result
720    }
721
722    /// Evaluate scaling based on current metrics.
723    ///
724    /// Returns a scaling decision without executing it.
725    pub fn evaluate_scaling(&self) -> ScalingDecision {
726        if !self.policy.auto_scale {
727            return ScalingDecision::None;
728        }
729
730        // Check cooldown
731        if let Some(last) = *self.last_scaling.read() {
732            if last.elapsed() < self.policy.cooldown {
733                return ScalingDecision::None;
734            }
735        }
736
737        let shards = self.shards.read();
738        let active_count = self.active_count.load(AtomicOrdering::Acquire);
739
740        // Check for scale-up triggers
741        let mut overloaded_count = 0;
742        let mut underutilized_count = 0;
743
744        // Warmup window for freshly-activated shards. A just-
745        // activated shard's `last_metrics` is the
746        // `ShardMetrics::new(id)` placeholder
747        // (`fill_ratio = 0.0, event_rate = 0`) until at least one
748        // `collect_metrics` cycle has run. The placeholder
749        // immediately matches the underutilized trigger
750        // (`fill_ratio < underutilized_threshold && event_rate
751        // == 0`), so a fresh shard added by scale-up would
752        // immediately count as underutilized on the next
753        // `evaluate_scaling` and trigger scale-down — oscillating
754        // the system. Reuse `policy.cooldown` as the warmup
755        // window: it's already the minimum gap between scaling
756        // actions, and a shard collected at least once within
757        // that window has accumulated real metrics.
758        let now = Instant::now();
759        let warmup = self.policy.cooldown;
760
761        for shard in shards.iter() {
762            if shard.state != ShardState::Active {
763                continue;
764            }
765
766            // Skip the placeholder-metrics window for freshly-
767            // activated shards. They count toward `active_count`
768            // (so the budget math stays consistent) but don't
769            // tip the overload/underutilized tallies.
770            if now.duration_since(shard.activated_at) < warmup {
771                continue;
772            }
773
774            let m = &shard.last_metrics;
775
776            // Scale-up triggers
777            if m.fill_ratio > self.policy.fill_ratio_threshold
778                || m.avg_push_latency_ns > self.policy.push_latency_threshold_ns
779                || m.avg_flush_latency_us > self.policy.flush_latency_threshold_us
780            {
781                overloaded_count += 1;
782            }
783
784            // Scale-down triggers
785            if m.fill_ratio < self.policy.underutilized_threshold && m.event_rate == 0 {
786                underutilized_count += 1;
787            }
788        }
789
790        // Scale up if more than half of shards are overloaded
791        if overloaded_count > active_count / 2 && active_count < self.policy.max_shards {
792            // Add shards proportional to overload
793            let shards_to_add = (overloaded_count / 2)
794                .max(1)
795                .min(self.policy.max_shards - active_count);
796            return ScalingDecision::ScaleUp(shards_to_add);
797        }
798
799        // Scale down if more than half of shards are underutilized
800        // and we're above minimum
801        if underutilized_count > active_count / 2 && active_count > self.policy.min_shards {
802            let shards_to_remove = (underutilized_count / 2)
803                .max(1)
804                .min(active_count - self.policy.min_shards);
805            return ScalingDecision::ScaleDown(shards_to_remove);
806        }
807
808        ScalingDecision::None
809    }
810
811    /// Validate cooldown + max-shards budget for an `add count`
812    /// scale-up request. Cheap pre-check that doesn't take the
813    /// write lock — callers re-check under the lock.
814    fn check_scale_up_budget(&self, count: u16) -> Result<(), ScalingError> {
815        let current = self.active_count.load(AtomicOrdering::Acquire);
816        let would_be = current
817            .checked_add(count)
818            .ok_or(ScalingError::AtMaxShards)?;
819        if would_be > self.policy.max_shards {
820            return Err(ScalingError::AtMaxShards);
821        }
822        let last = self.last_scaling.read();
823        if let Some(ts) = *last {
824            if ts.elapsed() < self.policy.cooldown {
825                return Err(ScalingError::InCooldown);
826            }
827        }
828        Ok(())
829    }
830
831    /// Allocate `count` shard ids and push their `MappedShard`
832    /// records into `shards` with the supplied `state`. The caller
833    /// already holds `self.shards.write()` and is responsible for
834    /// dropping it before notifying callbacks. Returns the allocated
835    /// ids in order.
836    ///
837    /// Performs the budget + cooldown re-check under the write lock,
838    /// the next_shard_id allocation, and the per-shard push. Does NOT
839    /// touch `active_count` — `scale_up` bumps it for `Active` shards;
840    /// `Provisioning` shards bump it later when `activate` fires.
841    fn allocate_shards_inner(
842        &self,
843        count: u16,
844        state: ShardState,
845        shards: &mut Vec<MappedShard>,
846    ) -> Result<Vec<u16>, ScalingError> {
847        self.allocate_shards_inner_with_policy(count, state, shards, false)
848    }
849
850    fn allocate_shards_inner_with_policy(
851        &self,
852        count: u16,
853        state: ShardState,
854        shards: &mut Vec<MappedShard>,
855        force: bool,
856    ) -> Result<Vec<u16>, ScalingError> {
857        // Re-check budget under the write lock — two concurrent
858        // scale-up callers could both pass the read-locked early
859        // check, both serialize through `shards.write()`, and both
860        // succeed without this re-check.
861        if force {
862            // Budget only — skip cooldown for operator-initiated paths.
863            let current = self.active_count.load(AtomicOrdering::Acquire);
864            let would_be = current
865                .checked_add(count)
866                .ok_or(ScalingError::AtMaxShards)?;
867            if would_be > self.policy.max_shards {
868                return Err(ScalingError::AtMaxShards);
869            }
870        } else {
871            self.check_scale_up_budget(count)?;
872        }
873
874        let first_id = self.next_shard_id.load(AtomicOrdering::Relaxed);
875        let last_needed = first_id
876            .checked_add(count.saturating_sub(1))
877            .ok_or(ScalingError::AtMaxShards)?;
878        // Reserve `u16::MAX` as a sentinel so the post-allocation
879        // store cannot wrap.
880        if last_needed == u16::MAX {
881            return Err(ScalingError::AtMaxShards);
882        }
883        // `first_id + count == last_needed + 1`. We already
884        // refused `last_needed == u16::MAX` above, so the sum is
885        // provably <= u16::MAX. Use `checked_add` anyway as a
886        // belt-and-suspenders guard: a future change that
887        // weakens the sentinel check would otherwise reach an
888        // unchecked u16 wrap here, silently rolling
889        // `next_shard_id` back to 0 and then re-issuing already-
890        // allocated ids.
891        let next_id_after = first_id
892            .checked_add(count)
893            .ok_or(ScalingError::AtMaxShards)?;
894        self.next_shard_id
895            .store(next_id_after, AtomicOrdering::Relaxed);
896
897        let mut new_ids = Vec::with_capacity(count as usize);
898        let now = Instant::now();
899        for i in 0..count {
900            let new_id = first_id + i;
901            shards.push(MappedShard {
902                id: new_id,
903                state,
904                metrics: Arc::new(ShardMetricsCollector::new(
905                    new_id,
906                    self.ring_buffer_capacity,
907                )),
908                drain_started: None,
909                last_metrics: ShardMetrics::new(new_id),
910                // Stamp the activation moment so `evaluate_scaling`
911                // can skip this shard until at least one collect
912                // cycle has run. Prevents the placeholder
913                // (`fill_ratio = 0, event_rate = 0`) from
914                // immediately tripping the underutilized trigger
915                // and oscillating the system.
916                activated_at: now,
917            });
918            new_ids.push(new_id);
919        }
920        Ok(new_ids)
921    }
922
923    /// Execute a scale-up operation.
924    ///
925    /// Creates new shards in the `Active` state and makes them
926    /// immediately available for routing. Use [`scale_up_provisioning`]
927    /// if upstream workers (drain / batch) need to be wired up before
928    /// the shard becomes selectable — otherwise producer pushes can
929    /// race ahead of consumer creation.
930    ///
931    /// [`scale_up_provisioning`]: Self::scale_up_provisioning
932    pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
933        // Short-circuit `count == 0` so a no-op call doesn't bump the
934        // cooldown timestamp or trip the `u16::MAX` sentinel check
935        // (which previously fired spuriously when
936        // `first_id == u16::MAX` even though zero ids were being
937        // allocated).
938        if count == 0 {
939            return Ok(Vec::new());
940        }
941
942        self.check_scale_up_budget(count)?;
943
944        let mut shards = self.shards.write();
945        let new_ids = self.allocate_shards_inner(count, ShardState::Active, &mut shards)?;
946
947        // Update counts. `fetch_add` cannot wrap here — the
948        // `check_scale_up_budget` gate above ensures `current + count <=
949        // max_shards <= u16::MAX` — but keep the ordering explicit
950        // so the next contender's cooldown re-check sees the fresh
951        // timestamp.
952        self.active_count.fetch_add(count, AtomicOrdering::Release);
953        *self.last_scaling.write() = Some(Instant::now());
954
955        // New active shards are routable immediately. Refresh the
956        // selection table before dropping the lock so any concurrent
957        // `select_shard` observes the new ids in its very next call.
958        self.rebuild_selection_table_locked(&shards);
959
960        // Drop the write lock before notifying callbacks — they
961        // are user-supplied and may take arbitrary time.
962        drop(shards);
963
964        if let Some(callback) = self.on_shard_created.read().as_ref() {
965            for &id in &new_ids {
966                callback(id);
967            }
968        }
969
970        Ok(new_ids)
971    }
972
973    /// Like [`scale_up`], but the new shards are created in the
974    /// `Provisioning` state. They receive an id and a metrics
975    /// collector, but `select_shard` will not route to them and they
976    /// are excluded from `active_shard_count` / `evaluate_scaling`
977    /// until the caller transitions each shard with [`activate`].
978    ///
979    /// Use this when upstream consumer infrastructure (drain/batch
980    /// workers, mpsc channels, etc.) must be wired up *before* the
981    /// shard becomes selectable. Without this gating, producers can
982    /// observe the shard via `select_shard`, push into its ring
983    /// buffer, and never have those events drained.
984    ///
985    /// Returns the allocated ids in order. Cooldown / `max_shards`
986    /// gating matches `scale_up` so that staged allocation cannot be
987    /// used to bypass the policy.
988    ///
989    /// [`scale_up`]: Self::scale_up
990    /// [`activate`]: Self::activate
991    pub fn scale_up_provisioning(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
992        if count == 0 {
993            return Ok(Vec::new());
994        }
995
996        self.check_scale_up_budget(count)?;
997
998        let mut shards = self.shards.write();
999        let new_ids = self.allocate_shards_inner(count, ShardState::Provisioning, &mut shards)?;
1000
1001        // Bump cooldown but NOT active_count — these shards are
1002        // not active yet. `activate` bumps active_count when each
1003        // becomes selectable.
1004        *self.last_scaling.write() = Some(Instant::now());
1005        drop(shards);
1006        // Intentionally do NOT fire `on_shard_created` here — the
1007        // callback signals "shard is live"; provisioning shards are
1008        // not. `activate` fires it instead.
1009        Ok(new_ids)
1010    }
1011
1012    /// Allocate `count` Provisioning shards, bypassing the cooldown gate.
1013    ///
1014    /// Used by operator-initiated `manual_scale_up` paths. The
1015    /// cooldown exists to prevent the auto-scaling monitor from
1016    /// scaling-up too aggressively in response to transient
1017    /// load spikes; a manual call from an operator is a
1018    /// deliberate request that should not be rate-limited by
1019    /// the auto-scaling cadence. The budget check (against
1020    /// `max_shards`) still applies.
1021    ///
1022    /// Pre-fix `manual_scale_up(N)` looped `add_shard()` N
1023    /// times, each call invoking `scale_up_provisioning(1)`
1024    /// which bumped `last_scaling`. The second call then
1025    /// immediately failed with `InCooldown` (default 30s
1026    /// cooldown), leaving the first shard half-added and
1027    /// returning an error to the operator with no rollback.
1028    pub fn scale_up_provisioning_force(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1029        if count == 0 {
1030            return Ok(Vec::new());
1031        }
1032
1033        let mut shards = self.shards.write();
1034        let new_ids = self.allocate_shards_inner_with_policy(
1035            count,
1036            ShardState::Provisioning,
1037            &mut shards,
1038            true, // skip cooldown
1039        )?;
1040
1041        // Still bump the cooldown timestamp so the next
1042        // *auto-scaling* tick respects the cooldown floor.
1043        *self.last_scaling.write() = Some(Instant::now());
1044        drop(shards);
1045        Ok(new_ids)
1046    }
1047
1048    /// Transition a `Provisioning` shard to `Active`.
1049    ///
1050    /// Returns `Ok(true)` if a state transition actually occurred
1051    /// (Provisioning → Active) and `Ok(false)` if the shard was
1052    /// already `Active` — the latter is the idempotent path.
1053    /// Returns `InvalidPolicy` for unknown or `Draining`/`Stopped`
1054    /// shards — those states require a different lifecycle path.
1055    /// Bumps `active_count` and notifies the `on_shard_created`
1056    /// callback exactly once per real transition.
1057    ///
1058    /// Pre-fix this returned `Result<(), ScalingError>`,
1059    /// so callers (notably `ShardManager::activate_shard`) could
1060    /// not tell whether they had bumped the live count or not and
1061    /// double-incremented their own `num_shards` on every
1062    /// idempotent call.
1063    pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError> {
1064        let mut shards = self.shards.write();
1065        let shard = shards
1066            .iter_mut()
1067            .find(|s| s.id == shard_id)
1068            .ok_or_else(|| {
1069                ScalingError::InvalidPolicy(format!("activate: shard {} not found", shard_id))
1070            })?;
1071        match shard.state {
1072            ShardState::Active => return Ok(false),
1073            ShardState::Provisioning => {
1074                // Gate activation on
1075                // `active_count < max_shards`. The budget gate at
1076                // `check_scale_up_budget` only counts ALREADY-active
1077                // shards — multiple `scale_up_provisioning(1)` calls
1078                // can each pass (they don't bump `active_count`),
1079                // and an unconditional `fetch_add(1)` here would
1080                // push past `max_shards`. Subsequent
1081                // `evaluate_scaling`'s `max_shards - active_count`
1082                // arithmetic would then underflow u16 (debug-build
1083                // panic; release wraps to ~65530). Activate-time
1084                // re-checks the budget with the lock held; the
1085                // Provisioning shard stays in Provisioning state and
1086                // the caller (e.g. `add_shard_internal`'s rollback)
1087                // is responsible for tearing it down.
1088                //
1089                // The load + state mutation + `fetch_add` all happen
1090                // while we hold the `shards.write()` guard so a
1091                // concurrent `activate(distinct_id)` reads the
1092                // already-bumped `active_count` and hits
1093                // `AtMaxShards` instead of squeezing through the
1094                // window between our state update and our
1095                // `fetch_add`. Pre-fix the `fetch_add` ran after
1096                // `drop(shards)` — two activates could each see a
1097                // stale count below `max_shards` and both bump,
1098                // transiently overshooting the budget.
1099                let current = self.active_count.load(AtomicOrdering::Acquire);
1100                if current >= self.policy.max_shards {
1101                    return Err(ScalingError::AtMaxShards);
1102                }
1103                shard.state = ShardState::Active;
1104                // Re-stamp `activated_at` so `evaluate_scaling`
1105                // gives this freshly-activated shard a warmup
1106                // window before counting it in the
1107                // overloaded/underutilized tallies. The
1108                // Provisioning → Active transition is the moment
1109                // traffic starts flowing; the shard's
1110                // `last_metrics` is still the
1111                // `ShardMetrics::new(id)` placeholder until the
1112                // next `collect_metrics` cycle.
1113                shard.activated_at = Instant::now();
1114                // Publish the increment while still holding the
1115                // write lock. `Release` here pairs with the
1116                // `Acquire` load above so any later activator that
1117                // takes the same lock observes our bump.
1118                self.active_count.fetch_add(1, AtomicOrdering::Release);
1119            }
1120            ShardState::Draining | ShardState::Stopped => {
1121                return Err(ScalingError::InvalidPolicy(format!(
1122                    "activate: shard {} is in state {:?}, cannot activate",
1123                    shard_id, shard.state
1124                )));
1125            }
1126        }
1127        // The shard just became routable. Refresh the selection
1128        // table before dropping the lock so the next `select_shard`
1129        // observes it.
1130        self.rebuild_selection_table_locked(&shards);
1131        drop(shards);
1132
1133        if let Some(callback) = self.on_shard_created.read().as_ref() {
1134            callback(shard_id);
1135        }
1136        Ok(true)
1137    }
1138
1139    /// Drain a specific shard by id, transitioning it from `Active`
1140    /// to `Draining`.
1141    ///
1142    /// Companion to `ShardManager::drain_shard`. The previous
1143    /// implementation only flipped the metrics collector's `draining`
1144    /// atomic; this version atomically updates `MappedShard.state`
1145    /// (so `select_shard` stops routing to the shard) and decrements
1146    /// `active_count` (so `evaluate_scaling`'s budget math stays
1147    /// consistent with `scale_down`). Returns an error if the shard
1148    /// is not in `Active` state, or if doing so would push the active
1149    /// count below `min_shards`.
1150    pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError> {
1151        let mut shards = self.shards.write();
1152        let current_active = self.active_count.load(AtomicOrdering::Acquire);
1153        if current_active <= self.policy.min_shards {
1154            return Err(ScalingError::AtMinShards);
1155        }
1156        let shard = shards
1157            .iter_mut()
1158            .find(|s| s.id == shard_id)
1159            .ok_or_else(|| {
1160                ScalingError::InvalidPolicy(format!("drain_specific: shard {} not found", shard_id))
1161            })?;
1162        match shard.state {
1163            ShardState::Active => {
1164                shard.state = ShardState::Draining;
1165                shard.drain_started = Some(Instant::now());
1166                shard.metrics.set_draining(true);
1167            }
1168            ShardState::Draining => return Ok(()),
1169            ShardState::Provisioning | ShardState::Stopped => {
1170                return Err(ScalingError::InvalidPolicy(format!(
1171                    "drain_specific: shard {} is in state {:?}, cannot drain",
1172                    shard_id, shard.state
1173                )));
1174            }
1175        }
1176        // The drained shard is no longer routable. Refresh the
1177        // selection table before dropping the lock.
1178        self.rebuild_selection_table_locked(&shards);
1179        drop(shards);
1180        self.active_count.fetch_sub(1, AtomicOrdering::Release);
1181        // Bump `last_scaling` so a subsequent `scale_up` is gated
1182        // by the cooldown floor. Pre-fix `drain_specific` removed
1183        // a shard from Active without touching `last_scaling`, so
1184        // the sequence `drain_specific(id) → scale_up(N)`
1185        // bypassed the cooldown — `scale_down` writes
1186        // `last_scaling` precisely for this reason. From the
1187        // budget-math perspective `drain_specific` IS a scale-
1188        // down (it decrements `active_count` and trips the
1189        // `min_shards` floor), so it should also gate
1190        // re-expansion the same way.
1191        *self.last_scaling.write() = Some(Instant::now());
1192        Ok(())
1193    }
1194
1195    /// Start draining shards for scale-down.
1196    ///
1197    /// Marks shards as draining so they stop receiving new events.
1198    /// Shards will be removed once they're empty.
1199    pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1200        // Early checks (may race, but avoid acquiring the write lock)
1201        let current = self.active_count.load(AtomicOrdering::Acquire);
1202        if current <= self.policy.min_shards {
1203            return Err(ScalingError::AtMinShards);
1204        }
1205
1206        let to_drain = count.min(current - self.policy.min_shards);
1207        if to_drain == 0 {
1208            return Err(ScalingError::AtMinShards);
1209        }
1210        {
1211            let last = self.last_scaling.read();
1212            if let Some(ts) = *last {
1213                if ts.elapsed() < self.policy.cooldown {
1214                    return Err(ScalingError::InCooldown);
1215                }
1216            }
1217        }
1218
1219        let mut shards = self.shards.write();
1220
1221        // Re-check under the lock to prevent race conditions (double-check pattern)
1222        let current = self.active_count.load(AtomicOrdering::Acquire);
1223        if current <= self.policy.min_shards {
1224            return Err(ScalingError::AtMinShards);
1225        }
1226        let to_drain = count.min(current - self.policy.min_shards);
1227        if to_drain == 0 {
1228            return Err(ScalingError::AtMinShards);
1229        }
1230        // Re-check cooldown under the same write lock that gates
1231        // mutation — see the matching note in `scale_up`.
1232        {
1233            let last = self.last_scaling.read();
1234            if let Some(ts) = *last {
1235                if ts.elapsed() < self.policy.cooldown {
1236                    return Err(ScalingError::InCooldown);
1237                }
1238            }
1239        }
1240
1241        let mut drained_ids = Vec::with_capacity(to_drain as usize);
1242
1243        // Find shards with lowest weight (least utilized) to drain
1244        let mut active_indices: Vec<_> = shards
1245            .iter()
1246            .enumerate()
1247            .filter(|(_, s)| s.state == ShardState::Active)
1248            .map(|(i, s)| (i, s.last_metrics.weight))
1249            .collect();
1250
1251        // Sort by weight (ascending - drain least utilized first)
1252        active_indices.sort_by(|a, b| a.1.total_cmp(&b.1));
1253
1254        // Mark shards for draining
1255        for (idx, _) in active_indices.into_iter().take(to_drain as usize) {
1256            shards[idx].state = ShardState::Draining;
1257            shards[idx].drain_started = Some(Instant::now());
1258            shards[idx].metrics.set_draining(true);
1259            drained_ids.push(shards[idx].id);
1260        }
1261
1262        // Update count
1263        self.active_count
1264            .fetch_sub(to_drain, AtomicOrdering::Release);
1265        *self.last_scaling.write() = Some(Instant::now());
1266
1267        // Drained shards are no longer routable. Refresh the
1268        // selection table while still holding the lock.
1269        self.rebuild_selection_table_locked(&shards);
1270
1271        Ok(drained_ids)
1272    }
1273
1274    /// Check draining shards and finalize those that are empty.
1275    ///
1276    /// Returns IDs of shards that were stopped.
1277    ///
1278    /// This predicate looks ONLY at the ring buffer
1279    /// (`current_len` + `pushes_since_drain_start`); it does NOT
1280    /// probe the per-shard mpsc channel or the BatchWorker's
1281    /// `current_batch`. A shard that the predicate flags as empty
1282    /// can still have events queued in those two places. The
1283    /// correctness gate is therefore `bus::remove_shard_internal`,
1284    /// which awaits the BatchWorker's `JoinHandle` before
1285    /// constructing the stranded-flush batch — see that function's
1286    /// step 3 for the rationale. Tightening this predicate is a
1287    /// defense-in-depth follow-up; a stricter ring-buffer-empty
1288    /// signal here would only narrow an already-closed window.
1289    pub fn finalize_draining(&self) -> Vec<u16> {
1290        let mut shards = self.shards.write();
1291        let mut stopped = Vec::new();
1292
1293        for shard in shards.iter_mut() {
1294            if shard.state == ShardState::Draining {
1295                // Check if shard is empty by reading current_len directly,
1296                // avoiding collect_and_reset() which destructively zeros all counters.
1297                let current_len = shard.metrics.current_len.load(AtomicOrdering::Relaxed);
1298                let fill_ratio = if shard.metrics.capacity > 0 {
1299                    current_len as f64 / shard.metrics.capacity as f64
1300                } else {
1301                    0.0
1302                };
1303                // Previously read `events_in_window` here, which
1304                // `collect_and_reset` zeros every metrics tick. A
1305                // producer push that landed in the window between two
1306                // ticks could be silently zeroed out, so a draining
1307                // shard whose buffer transiently emptied was finalized
1308                // with a producer still attached.
1309                // `pushes_since_drain_start` is a separate counter
1310                // that is only reset by `set_draining(true)`, so any
1311                // push observed since the drain began is sticky —
1312                // exactly the signal we want.
1313                // Acquire pairs with `set_draining`'s SeqCst reset so
1314                // the load can't observe a stale value from before the
1315                // drain began. A Relaxed load here let weakly-ordered
1316                // hardware see the pre-reset count and finalize while
1317                // a producer was still pushing.
1318                let pushes_after_drain = shard.metrics.pushes_since_drain_start();
1319                if fill_ratio == 0.0 && pushes_after_drain == 0 {
1320                    // Check if we've waited long enough
1321                    if let Some(drain_start) = shard.drain_started {
1322                        if drain_start.elapsed() > Duration::from_millis(100) {
1323                            shard.state = ShardState::Stopped;
1324                            stopped.push(shard.id);
1325                        }
1326                    }
1327                }
1328            }
1329        }
1330
1331        // Draining → Stopped transitions: no impact on the routable
1332        // set (Draining was already excluded), but rebuild for
1333        // consistency in case `last_metrics.weight` was stale on a
1334        // shard that just stopped. Cheap; bounded by shard count.
1335        if !stopped.is_empty() {
1336            self.rebuild_selection_table_locked(&shards);
1337        }
1338
1339        // Drop the write lock BEFORE notifying. The callback is
1340        // user-supplied and may re-enter the mapper (`shard_state`,
1341        // `select_shard`, `metrics_collector`, …), each of which
1342        // acquires `shards.read()`. `parking_lot::RwLock` is not
1343        // recursive, so a re-entrant read attempt while we hold a
1344        // write would deadlock. `scale_up`'s callback path already
1345        // releases its lock before calling out — mirror that
1346        // here.
1347        drop(shards);
1348
1349        if !stopped.is_empty() {
1350            if let Some(callback) = self.on_shard_removed.read().as_ref() {
1351                for &id in &stopped {
1352                    callback(id);
1353                }
1354            }
1355        }
1356
1357        stopped
1358    }
1359
1360    /// Remove a specific shard from the mapper if it is in the
1361    /// `Stopped` state. Used by `ShardManager::remove_shard` so a
1362    /// per-shard cleanup doesn't disturb sibling `Stopped`
1363    /// entries — which a sequential `manual_scale_down` loop
1364    /// still needs to look up state for. Returns `true` if the
1365    /// shard existed and was Stopped (and was removed).
1366    pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool {
1367        let mut shards = self.shards.write();
1368        let before = shards.len();
1369        shards.retain(|s| !(s.id == shard_id && s.state == ShardState::Stopped));
1370        let removed = shards.len() < before;
1371        if removed {
1372            // Stopped shards weren't routable, but the underlying
1373            // slice changed length / order — rebuild so the table
1374            // doesn't dangle a stale id.
1375            self.rebuild_selection_table_locked(&shards);
1376        }
1377        removed
1378    }
1379
1380    /// Remove stopped shards from the mapper.
1381    pub fn remove_stopped_shards(&self) -> Vec<u16> {
1382        let mut shards = self.shards.write();
1383        let before = shards.len();
1384        let removed: Vec<u16> = shards
1385            .iter()
1386            .filter(|s| s.state == ShardState::Stopped)
1387            .map(|s| s.id)
1388            .collect();
1389
1390        shards.retain(|s| s.state != ShardState::Stopped);
1391
1392        if shards.len() < before {
1393            tracing::info!(
1394                removed = removed.len(),
1395                remaining = shards.len(),
1396                "Removed stopped shards"
1397            );
1398            // Same reason as `remove_specific_stopped_shard`.
1399            self.rebuild_selection_table_locked(&shards);
1400        }
1401
1402        removed
1403    }
1404
1405    /// Get the state of a specific shard.
1406    pub fn shard_state(&self, shard_id: u16) -> Option<ShardState> {
1407        self.shards
1408            .read()
1409            .iter()
1410            .find(|s| s.id == shard_id)
1411            .map(|s| s.state)
1412    }
1413
1414    /// Get all active shard IDs.
1415    pub fn active_shard_ids(&self) -> Vec<u16> {
1416        self.shards
1417            .read()
1418            .iter()
1419            .filter(|s| s.state == ShardState::Active)
1420            .map(|s| s.id)
1421            .collect()
1422    }
1423
1424    /// Get all shard IDs (including draining).
1425    pub fn all_shard_ids(&self) -> Vec<u16> {
1426        self.shards
1427            .read()
1428            .iter()
1429            .filter(|s| s.state != ShardState::Stopped)
1430            .map(|s| s.id)
1431            .collect()
1432    }
1433
1434    /// Get the scaling policy.
1435    pub fn policy(&self) -> &ScalingPolicy {
1436        &self.policy
1437    }
1438    // `set_policy` previously took `&mut self` and was unreachable
1439    // through the `Arc<ShardMapper>` that the production code holds
1440    // (`Arc::get_mut` fails once the worker pool has cloned the Arc).
1441    // The method has been removed — recreate the mapper / bus to
1442    // change the policy.
1443}
1444
1445#[cfg(test)]
1446mod tests {
1447    #![allow(
1448        clippy::disallowed_methods,
1449        reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1450    )]
1451    use super::*;
1452
1453    #[test]
1454    fn test_shard_mapper_creation() {
1455        let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1456        assert_eq!(mapper.active_shard_count(), 4);
1457        assert_eq!(mapper.total_shard_count(), 4);
1458    }
1459
1460    #[test]
1461    fn test_select_shard_distributes() {
1462        let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1463
1464        // Different hashes should potentially select different shards
1465        let mut selected = std::collections::HashSet::new();
1466        for i in 0..100u64 {
1467            let shard = mapper.select_shard(i * 12345);
1468            selected.insert(shard);
1469        }
1470
1471        // With 4 shards, we should hit multiple
1472        assert!(!selected.is_empty());
1473    }
1474
1475    /// Pin for perf #2: the selection table reflects the routable
1476    /// subset *exactly*, and `select_shard` reads it lock-free.
1477    /// A regression that, say, forgot to refresh after `drain_specific`
1478    /// would let `select_shard` continue routing to the draining
1479    /// shard — observable here as the drained id appearing in
1480    /// `select_shard`'s output.
1481    #[test]
1482    fn selection_table_reflects_active_subset_after_state_transitions() {
1483        // ScalingPolicy::validate rejects `min_shards == 0`; use 1
1484        // and verify we can drain down to the floor via the
1485        // official `drain_specific` API.
1486        let policy = ScalingPolicy {
1487            min_shards: 1,
1488            max_shards: 8,
1489            cooldown: Duration::from_nanos(1),
1490            ..Default::default()
1491        };
1492        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1493
1494        // Initial: all 3 shards routable.
1495        let mut seen = std::collections::HashSet::new();
1496        for i in 0..500u64 {
1497            seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1498        }
1499        assert_eq!(seen, std::collections::HashSet::from([0, 1, 2]));
1500
1501        // Drain shard 1 → only 0 and 2 should be routable.
1502        mapper.drain_specific(1).unwrap();
1503        let mut seen = std::collections::HashSet::new();
1504        for i in 0..500u64 {
1505            seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1506        }
1507        assert_eq!(
1508            seen,
1509            std::collections::HashSet::from([0, 2]),
1510            "drained shard must vanish from select_shard output; \
1511             a regression here would let select_shard route to a \
1512             Draining shard (blocking finalization)",
1513        );
1514
1515        // Drain shard 2 → only 0 left (at min_shards floor).
1516        mapper.drain_specific(2).unwrap();
1517        let mut seen = std::collections::HashSet::new();
1518        for i in 0..500u64 {
1519            seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1520        }
1521        assert_eq!(seen, std::collections::HashSet::from([0]));
1522    }
1523
1524    /// Pin: `select_shard` does NOT acquire `self.shards` (read or
1525    /// write). The whole perf #2 fix is that the hot path is
1526    /// `ArcSwap::load` only — no parking_lot. We pin this by
1527    /// holding a write lock on `shards` and asserting
1528    /// `select_shard` still returns immediately.
1529    #[test]
1530    fn select_shard_does_not_acquire_shards_lock() {
1531        let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1532
1533        // Hold the write lock. If `select_shard` tried to acquire
1534        // any flavor of `shards.{read,write}()` it would block
1535        // forever — parking_lot is not recursive.
1536        let _guard = mapper.shards.write();
1537        let shard = mapper.select_shard(0xDEAD_BEEF);
1538        assert!(shard < 2, "select_shard must return one of [0, 1]");
1539    }
1540
1541    /// `select_shard`'s candidate-index computation must
1542    /// be unbiased across the u64 hash space. Pre-fix used
1543    /// `hash as usize % candidates.len()`, which over-weights low
1544    /// indices when `candidates.len()` is not a power of two.
1545    /// With u64 hashes the bias is small but non-zero and
1546    /// sustains a hot-shard skew over time. Lemire's
1547    /// `(hash * len) >> 64` is unbiased.
1548    ///
1549    /// We test the unbiased property by sampling a uniform
1550    /// distribution of u64 hashes over 3 candidate shards and
1551    /// asserting each bucket gets close to 1/3 of the picks.
1552    /// Empirical bound: ±5% across 30 000 trials with a
1553    /// well-distributed input.
1554    #[test]
1555    fn select_shard_distribution_is_unbiased() {
1556        // 3 candidates: a non-power-of-2 to expose the modulo
1557        // bias the fix removes.
1558        let mapper = ShardMapper::new(3, 1024, ScalingPolicy::default()).unwrap();
1559
1560        let trials = 30_000u64;
1561        // Spread inputs uniformly across the u64 range so the
1562        // multiply-shift mapping behaves as designed.
1563        let stride = u64::MAX / trials;
1564        let mut counts = [0u64; 3];
1565        for i in 0..trials {
1566            let h = i.wrapping_mul(stride);
1567            let id = mapper.select_shard(h);
1568            counts[id as usize] += 1;
1569        }
1570
1571        let expected = (trials / 3) as i64;
1572        for (id, &count) in counts.iter().enumerate() {
1573            let diff = (count as i64 - expected).abs();
1574            let pct = (diff as f64 / expected as f64) * 100.0;
1575            assert!(
1576                pct < 5.0,
1577                "shard {} bucket has {} hits ({:.2}% off expected {}); \
1578                 modulo bias would drift higher on certain shards",
1579                id,
1580                count,
1581                pct,
1582                expected
1583            );
1584        }
1585    }
1586
1587    #[test]
1588    fn test_scale_up() {
1589        // Explicitly set max_shards to allow scaling from 2 to 4
1590        let policy = ScalingPolicy {
1591            max_shards: 8,
1592            ..Default::default()
1593        };
1594        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1595
1596        let new_ids = mapper.scale_up(2).unwrap();
1597        assert_eq!(new_ids.len(), 2);
1598        assert_eq!(mapper.active_shard_count(), 4);
1599    }
1600
1601    #[test]
1602    fn test_scale_up_max_limit() {
1603        let policy = ScalingPolicy {
1604            max_shards: 4,
1605            ..Default::default()
1606        };
1607        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1608
1609        let result = mapper.scale_up(1);
1610        assert!(matches!(result, Err(ScalingError::AtMaxShards)));
1611    }
1612
1613    #[test]
1614    fn test_scale_down() {
1615        let policy = ScalingPolicy {
1616            min_shards: 1,
1617            cooldown: Duration::from_nanos(1), // Disable cooldown for test
1618            ..Default::default()
1619        };
1620        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1621
1622        let drained = mapper.scale_down(2).unwrap();
1623        assert_eq!(drained.len(), 2);
1624        assert_eq!(mapper.active_shard_count(), 2);
1625    }
1626
1627    #[test]
1628    fn test_scale_down_min_limit() {
1629        let policy = ScalingPolicy {
1630            min_shards: 4,
1631            ..Default::default()
1632        };
1633        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1634
1635        let result = mapper.scale_down(1);
1636        assert!(matches!(result, Err(ScalingError::AtMinShards)));
1637    }
1638
1639    #[test]
1640    fn test_metrics_collection() {
1641        let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1642
1643        // Record some metrics
1644        if let Some(collector) = mapper.metrics_collector(0) {
1645            collector.record_buffer_len(512);
1646            collector.record_push(5);
1647            collector.record_push(10);
1648        }
1649
1650        let metrics = mapper.collect_metrics();
1651        assert_eq!(metrics.len(), 2);
1652
1653        let shard0_metrics = metrics.iter().find(|m| m.shard_id == 0).unwrap();
1654        assert!(shard0_metrics.fill_ratio > 0.0);
1655    }
1656
1657    /// Regression: a `record_push` / `record_flush` interleaving
1658    /// with a `collect_and_reset` swap must NOT desync `(sum,
1659    /// count)`. Pre-fix `push_latency_sum_ns` and `push_count`
1660    /// were independent atomics; a tick between the two
1661    /// `fetch_add`s captured the sum without the matching count
1662    /// (or the count without the sum). The resulting `avg =
1663    /// sum.checked_div(count).unwrap_or(0)` returned 0 in window
1664    /// N (sum without count) AND 0 in window N+1 (count without
1665    /// sum) — silently zeroing the average that drives
1666    /// `evaluate_scaling`'s push-latency scale-up trigger.
1667    ///
1668    /// Post-fix `(sum, count)` is packed into one
1669    /// `AtomicU64` so the swap captures both atomically. This
1670    /// test fires N concurrent `record_push` calls and a single
1671    /// `collect_and_reset` and asserts the captured count
1672    /// matches the captured sum (i.e. `sum >= count` because
1673    /// every push contributes at least 1 ns; `sum / count` is
1674    /// well-defined for any non-zero count).
1675    #[test]
1676    fn record_push_collect_no_sum_count_desync() {
1677        use std::sync::Barrier;
1678        use std::thread;
1679
1680        let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1681        const PUSHERS: usize = 4;
1682        const PUSHES_PER_THREAD: usize = 1_000;
1683
1684        let barrier = Arc::new(Barrier::new(PUSHERS + 1));
1685        let mut handles = vec![];
1686        for _ in 0..PUSHERS {
1687            let c = collector.clone();
1688            let b = barrier.clone();
1689            handles.push(thread::spawn(move || {
1690                b.wait();
1691                for i in 0..PUSHES_PER_THREAD {
1692                    // Vary latencies so sum/count averages
1693                    // aren't trivially the same number.
1694                    c.record_push((i as u64 % 100) + 1);
1695                }
1696            }));
1697        }
1698
1699        // Race a collect_and_reset across the pushers.
1700        barrier.wait();
1701        let snapshot1 = collector.collect_and_reset();
1702        for h in handles {
1703            h.join().unwrap();
1704        }
1705        // After all threads finish, drain whatever remains.
1706        let snapshot2 = collector.collect_and_reset();
1707
1708        // Reconstruct count and sum from the two snapshots.
1709        // We can't easily expose the packed atomic, so we use
1710        // the per-window averages and event counts as a
1711        // consistency check.
1712        let total_events = snapshot1.event_rate + snapshot2.event_rate;
1713        assert_eq!(
1714            total_events as usize,
1715            PUSHERS * PUSHES_PER_THREAD,
1716            "all pushes must be accounted for"
1717        );
1718
1719        // For each window: if event_rate > 0, avg_push_latency
1720        // must be > 0 (non-zero average) — pre-fix the desync
1721        // could land event_rate > 0 with avg = 0 (count
1722        // captured without sum, or sum without count → div-by-
1723        // zero clamped to 0). This is the directly visible
1724        // symptom of the desync.
1725        //
1726        // events_in_window is incremented separately from the
1727        // packed (sum, count) word, so a strict assertion of
1728        // "event_rate is exactly count" isn't safe — but the
1729        // weaker invariant "if any pushes were captured in the
1730        // (sum,count) word, the average is non-zero" survives
1731        // the packed-atomic fix.
1732        for snap in [&snapshot1, &snapshot2] {
1733            if snap.avg_push_latency_ns == 0 {
1734                // Either no pushes were captured in this
1735                // window, or — pre-fix — sum/count desynced.
1736                // The post-fix shape can only produce
1737                // avg=0 when count is also 0; we can't read
1738                // count directly from ShardMetrics, but
1739                // exercise the invariant by confirming the
1740                // OTHER window's sum is consistent with all
1741                // pushes.
1742                continue;
1743            }
1744            assert!(
1745                snap.avg_push_latency_ns >= 1,
1746                "regression: a window with non-zero avg must have \
1747                 a positive sum (pre-fix sum-without-count desync \
1748                 produced avg=0 with non-zero events)"
1749            );
1750        }
1751    }
1752
1753    #[test]
1754    fn test_draining_excludes_from_selection() {
1755        let policy = ScalingPolicy {
1756            min_shards: 1,
1757            cooldown: Duration::from_nanos(1),
1758            ..Default::default()
1759        };
1760        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1761
1762        // Drain one shard
1763        let drained = mapper.scale_down(1).unwrap();
1764        assert_eq!(drained.len(), 1);
1765
1766        // All selections should go to the remaining active shard
1767        let active_ids = mapper.active_shard_ids();
1768        assert_eq!(active_ids.len(), 1);
1769
1770        for i in 0..100u64 {
1771            let selected = mapper.select_shard(i);
1772            assert!(active_ids.contains(&selected));
1773        }
1774    }
1775
1776    #[test]
1777    fn test_policy_validation() {
1778        let invalid_policy = ScalingPolicy {
1779            fill_ratio_threshold: 1.5, // Invalid
1780            ..Default::default()
1781        };
1782        assert!(invalid_policy.validate().is_err());
1783
1784        // Without normalize(), this would be invalid
1785        let invalid_policy2 = ScalingPolicy {
1786            min_shards: 10,
1787            max_shards: 5,
1788            ..Default::default()
1789        };
1790        assert!(invalid_policy2.validate().is_err());
1791    }
1792
1793    #[test]
1794    fn test_policy_normalize_auto_adjusts_max_shards() {
1795        // When min_shards > max_shards, normalize() should adjust max_shards
1796        let policy = ScalingPolicy {
1797            min_shards: 8,
1798            max_shards: 2, // Less than min_shards
1799            ..Default::default()
1800        };
1801
1802        let normalized = policy.normalize();
1803        assert_eq!(
1804            normalized.max_shards, 8,
1805            "max_shards should be adjusted to min_shards"
1806        );
1807        assert!(
1808            normalized.validate().is_ok(),
1809            "normalized policy should be valid"
1810        );
1811    }
1812
1813    /// Regression: BUG_REPORT.md #7 — `scale_up` previously allocated
1814    /// new shard ids as `shards.iter().max() + 1`, which reused ids
1815    /// after the highest-numbered shard was drained-and-removed.
1816    /// Reusing ids merges two distinct shard lifetimes in any
1817    /// external metric/checkpoint system that keys on shard id.
1818    /// The fix uses a monotonic `next_shard_id` counter.
1819    #[test]
1820    fn scale_up_does_not_reuse_ids_after_remove() {
1821        let policy = ScalingPolicy {
1822            min_shards: 1,
1823            max_shards: 16,
1824            cooldown: Duration::from_nanos(1),
1825            ..Default::default()
1826        };
1827        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1828
1829        // Initial ids are 0 and 1. Scale up to 4 — new ids must be
1830        // 2 and 3 (the next two slots from the monotonic counter).
1831        let new_ids = mapper.scale_up(2).unwrap();
1832        assert_eq!(new_ids, vec![2, 3]);
1833
1834        // Drain one shard. `scale_down` picks by lowest weight, so
1835        // we don't get to choose which id is drained. Whichever it
1836        // is, we then force it through Stopped + remove and check
1837        // that the removed id is *not* reissued by the next
1838        // scale_up.
1839        let drained = mapper.scale_down(1).unwrap();
1840        let drained_id = drained[0];
1841        for shard in mapper.shards.write().iter_mut() {
1842            if shard.id == drained_id {
1843                shard.state = ShardState::Stopped;
1844            }
1845        }
1846        let removed = mapper.remove_stopped_shards();
1847        assert_eq!(removed, vec![drained_id]);
1848
1849        // Scale up by 1. The broken `max(existing_ids) + 1` allocator
1850        // could revive the just-removed id whenever `drained_id` had
1851        // been the highest id present (e.g. id 3 with 0/1/2/3 → 0/1/2
1852        // → next would be 3 again). The fix uses the monotonic
1853        // counter, which sits at 4 after the earlier scale_up, so
1854        // the new id must be 4 regardless of which id was drained.
1855        let new_ids = mapper.scale_up(1).unwrap();
1856        assert_eq!(
1857            new_ids,
1858            vec![4],
1859            "shard id {drained_id} was just removed; reusing any \
1860             previously-issued id would merge two distinct shard \
1861             lifetimes in external systems"
1862        );
1863    }
1864
1865    #[test]
1866    fn test_policy_normalize_preserves_valid_config() {
1867        // When max_shards >= min_shards, normalize() should not change anything
1868        let policy = ScalingPolicy {
1869            min_shards: 4,
1870            max_shards: 16,
1871            ..Default::default()
1872        };
1873
1874        let normalized = policy.normalize();
1875        assert_eq!(normalized.min_shards, 4);
1876        assert_eq!(normalized.max_shards, 16);
1877    }
1878
1879    #[test]
1880    fn test_shard_mapper_normalizes_policy() {
1881        // ShardMapper should accept a policy where min_shards > default max_shards
1882        // because it calls normalize() internally
1883        let policy = ScalingPolicy {
1884            min_shards: 4,
1885            ..Default::default()
1886        };
1887
1888        // This should succeed even on machines with < 4 CPUs
1889        let result = ShardMapper::new(4, 1024, policy);
1890        assert!(
1891            result.is_ok(),
1892            "ShardMapper should normalize policy automatically"
1893        );
1894    }
1895
1896    #[test]
1897    fn test_shard_mapper_adjusts_max_shards_to_initial_count() {
1898        // ShardMapper should adjust max_shards to accommodate initial_shards
1899        // even if initial_shards > default max_shards (CPU count)
1900        let policy = ScalingPolicy::default();
1901
1902        // Create mapper with 8 initial shards - should work even on 2-core machines
1903        let result = ShardMapper::new(8, 1024, policy);
1904        assert!(
1905            result.is_ok(),
1906            "ShardMapper should adjust max_shards to initial_shards"
1907        );
1908
1909        let mapper = result.unwrap();
1910        assert_eq!(mapper.active_shard_count(), 8);
1911
1912        // Verify the policy was adjusted
1913        assert!(
1914            mapper.policy().max_shards >= 8,
1915            "max_shards should be at least initial_shards"
1916        );
1917    }
1918
1919    #[test]
1920    fn test_scale_up_max_shards_concurrent() {
1921        use std::sync::Arc;
1922        use std::thread;
1923
1924        let policy = ScalingPolicy {
1925            max_shards: 10,
1926            cooldown: Duration::from_nanos(1), // Disable cooldown for test
1927            ..Default::default()
1928        };
1929        let mapper = Arc::new(ShardMapper::new(5, 1024, policy).unwrap());
1930
1931        // Spawn multiple threads that all try to scale up
1932        let mut handles = vec![];
1933        for _ in 0..5 {
1934            let mapper_clone = mapper.clone();
1935            handles.push(thread::spawn(move || mapper_clone.scale_up(3)));
1936        }
1937
1938        // Collect results
1939        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1940
1941        // Some should succeed, some should fail with AtMaxShards
1942        let successes: Vec<_> = results.iter().filter(|r| r.is_ok()).collect();
1943        let failures: Vec<_> = results
1944            .iter()
1945            .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1946            .collect();
1947
1948        // We started with 5, max is 10, each tries to add 3
1949        // At most we can add 5 more shards, so at most 1-2 can succeed
1950        assert!(
1951            !successes.is_empty() || !failures.is_empty(),
1952            "at least some operations should complete"
1953        );
1954
1955        // Final count should not exceed max_shards
1956        assert!(
1957            mapper.active_shard_count() <= 10,
1958            "should never exceed max_shards, got {}",
1959            mapper.active_shard_count()
1960        );
1961    }
1962
1963    /// Multiple `scale_up_provisioning(1)` calls must
1964    /// never push `active_count` past `max_shards` via subsequent
1965    /// `activate()` calls. Pre-fix the budget gate
1966    /// (`check_scale_up_budget`) only counted ALREADY-active
1967    /// shards, so several `scale_up_provisioning` calls could each
1968    /// pass and then each `activate()` unconditionally bumped
1969    /// `active_count` past the cap.
1970    ///
1971    /// Setup: at the budget edge, allocate two Provisioning shards
1972    /// in sequence (both pass the gate because `active_count`
1973    /// hasn't moved). The first `activate()` fills the budget;
1974    /// the second must surface `AtMaxShards` rather than overflow.
1975    #[test]
1976    fn activate_rejects_when_active_count_would_exceed_max_shards() {
1977        let policy = ScalingPolicy {
1978            min_shards: 1,
1979            max_shards: 4,
1980            cooldown: Duration::from_nanos(1),
1981            ..Default::default()
1982        };
1983        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1984        // active_count = 3, max = 4. Allocate TWO Provisioning
1985        // shards; both pass the budget gate (it sees active_count=3).
1986        let ids_a = mapper.scale_up_provisioning(1).unwrap();
1987        // The 1ns cooldown elapses on every realistic scheduler
1988        // tick; if we lose that race, retry. Release-mode and
1989        // some debug-build paths occasionally finish the first
1990        // allocation in <1ns of wall time.
1991        let ids_b = loop {
1992            match mapper.scale_up_provisioning(1) {
1993                Ok(v) => break v,
1994                Err(ScalingError::InCooldown) => {
1995                    std::thread::sleep(Duration::from_nanos(50));
1996                }
1997                Err(e) => panic!("unexpected error from scale_up_provisioning: {:?}", e),
1998            }
1999        };
2000        assert_eq!(ids_a.len(), 1);
2001        assert_eq!(ids_b.len(), 1);
2002
2003        // First activate succeeds (active_count goes 3 → 4 = max).
2004        mapper
2005            .activate(ids_a[0])
2006            .expect("first activate must succeed");
2007        assert_eq!(mapper.active_shard_count(), 4);
2008
2009        // Second activate must REFUSE — it would push past max.
2010        let err = mapper
2011            .activate(ids_b[0])
2012            .expect_err("second activate must reject");
2013        assert!(
2014            matches!(err, ScalingError::AtMaxShards),
2015            "expected AtMaxShards, got {:?}",
2016            err
2017        );
2018        // active_count must still be at the cap, not over it.
2019        assert_eq!(mapper.active_shard_count(), 4);
2020    }
2021
2022    /// Pin: under contention, the active_count never transiently
2023    /// exceeds `max_shards`. Pre-fix `activate` released the
2024    /// shards write-lock BEFORE the `fetch_add(1)`, so two
2025    /// concurrent activators could each pass the budget gate
2026    /// (both reading the pre-bump count) and both bump,
2027    /// overshooting the cap by 1 at any observation point.
2028    #[test]
2029    fn concurrent_activate_never_exceeds_max_shards() {
2030        use std::sync::Arc;
2031        use std::sync::Barrier;
2032        use std::thread;
2033
2034        const ITERATIONS: usize = 200;
2035
2036        for iter in 0..ITERATIONS {
2037            let policy = ScalingPolicy {
2038                min_shards: 1,
2039                max_shards: 4,
2040                cooldown: Duration::from_nanos(1),
2041                ..Default::default()
2042            };
2043            // Start at active=3, allocate two Provisioning ids so
2044            // both threads have a candidate to activate. With the
2045            // pre-fix race: thread A loads count=3, validates,
2046            // sets state Active, drops lock; thread B loads count
2047            // (still 3), validates, sets state Active, drops lock;
2048            // A bumps → 4; B bumps → 5. Post-fix B observes
2049            // count=4 inside its lock and rejects with
2050            // AtMaxShards.
2051            let mapper = Arc::new(ShardMapper::new(3, 1024, policy).unwrap());
2052            let ids_a = mapper.scale_up_provisioning(1).unwrap();
2053            // The 1ns cooldown elapses on every realistic
2054            // scheduler tick; if we lose that race, retry once
2055            // (release-mode iterations occasionally finish the
2056            // first allocation in <1ns of wall time).
2057            let ids_b = loop {
2058                match mapper.scale_up_provisioning(1) {
2059                    Ok(v) => break v,
2060                    Err(ScalingError::InCooldown) => {
2061                        std::thread::sleep(Duration::from_micros(10));
2062                        continue;
2063                    }
2064                    Err(e) => panic!("unexpected error: {:?}", e),
2065                }
2066            };
2067            assert_eq!(ids_a.len(), 1);
2068            assert_eq!(ids_b.len(), 1);
2069
2070            let barrier = Arc::new(Barrier::new(2));
2071            let m1 = mapper.clone();
2072            let m2 = mapper.clone();
2073            let b1 = barrier.clone();
2074            let b2 = barrier.clone();
2075            let id_a = ids_a[0];
2076            let id_b = ids_b[0];
2077
2078            let h1 = thread::spawn(move || {
2079                b1.wait();
2080                m1.activate(id_a)
2081            });
2082            let h2 = thread::spawn(move || {
2083                b2.wait();
2084                m2.activate(id_b)
2085            });
2086            let r1 = h1.join().expect("thread A panicked");
2087            let r2 = h2.join().expect("thread B panicked");
2088
2089            // Exactly one must succeed and one must reject with
2090            // AtMaxShards.
2091            let ok_count = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2092            let at_max_count = [&r1, &r2]
2093                .iter()
2094                .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
2095                .count();
2096            assert_eq!(
2097                ok_count, 1,
2098                "iter {}: expected exactly one Ok, got r1={:?} r2={:?}",
2099                iter, r1, r2
2100            );
2101            assert_eq!(
2102                at_max_count, 1,
2103                "iter {}: expected exactly one AtMaxShards, got r1={:?} r2={:?}",
2104                iter, r1, r2
2105            );
2106
2107            // active_count must not exceed max_shards at any
2108            // observation point.
2109            assert!(
2110                mapper.active_shard_count() <= 4,
2111                "iter {}: active_count={} exceeded max_shards=4",
2112                iter,
2113                mapper.active_shard_count(),
2114            );
2115        }
2116    }
2117
2118    /// Two concurrent `scale_up(1)` calls must never both succeed
2119    /// inside a single cooldown window. Before the fix, the
2120    /// cooldown check happened only under a read lock that was
2121    /// released *before* the mutating write lock, so two threads
2122    /// could both observe `last_scaling=None` (or stale), both
2123    /// acquire the write lock in turn, and both succeed — racing
2124    /// past the cooldown floor and (on a max-shard-bounded
2125    /// scenario) potentially the `max_shards` cap.
2126    ///
2127    /// Pin: across `ITERATIONS` rounds of two-thread races, every
2128    /// iteration sees exactly one success and one `InCooldown`.
2129    #[test]
2130    fn cooldown_is_enforced_under_concurrent_scale_up() {
2131        use std::sync::Arc;
2132        use std::sync::Barrier;
2133        use std::thread;
2134
2135        const ITERATIONS: usize = 1_000;
2136
2137        for iter in 0..ITERATIONS {
2138            let policy = ScalingPolicy {
2139                min_shards: 1,
2140                max_shards: 16,
2141                // Large cooldown so a single iteration can't pass
2142                // the floor between the two threads' calls.
2143                cooldown: Duration::from_secs(60),
2144                ..Default::default()
2145            };
2146            let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2147            let barrier = Arc::new(Barrier::new(2));
2148
2149            let m1 = mapper.clone();
2150            let b1 = barrier.clone();
2151            let m2 = mapper.clone();
2152            let b2 = barrier.clone();
2153
2154            let h1 = thread::spawn(move || {
2155                b1.wait();
2156                m1.scale_up(1)
2157            });
2158            let h2 = thread::spawn(move || {
2159                b2.wait();
2160                m2.scale_up(1)
2161            });
2162
2163            let r1 = h1.join().unwrap();
2164            let r2 = h2.join().unwrap();
2165
2166            let oks = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2167            let cooldowns = [&r1, &r2]
2168                .iter()
2169                .filter(|r| matches!(r, Err(ScalingError::InCooldown)))
2170                .count();
2171
2172            assert_eq!(
2173                oks, 1,
2174                "iter {iter}: expected exactly one Ok, got r1={r1:?}, r2={r2:?}"
2175            );
2176            assert_eq!(
2177                cooldowns, 1,
2178                "iter {iter}: expected exactly one InCooldown, got r1={r1:?}, r2={r2:?}"
2179            );
2180            assert_eq!(
2181                mapper.active_shard_count(),
2182                3,
2183                "iter {iter}: cooldown violated — both calls mutated state (shard count {})",
2184                mapper.active_shard_count()
2185            );
2186        }
2187    }
2188
2189    #[test]
2190    fn test_scale_up_overflow_protection() {
2191        // Create mapper with a high starting shard ID to test overflow
2192        // protection. The monotonic id allocator advances by 1 per
2193        // shard regardless of which shards have been removed, so we
2194        // bump `next_shard_id` directly to simulate a near-`u16::MAX`
2195        // state.
2196        let policy = ScalingPolicy {
2197            max_shards: u16::MAX,
2198            ..Default::default()
2199        };
2200        let mapper = ShardMapper::new(1, 1024, policy).unwrap();
2201
2202        // Position the allocator so the next id is u16::MAX - 1.
2203        // Trying to allocate 3 ids would need {MAX-1, MAX, MAX+1};
2204        // the last is unrepresentable in `u16`, so scale_up rejects.
2205        mapper
2206            .next_shard_id
2207            .store(u16::MAX - 1, AtomicOrdering::Relaxed);
2208
2209        let result = mapper.scale_up(3);
2210        assert!(matches!(result, Err(ScalingError::AtMaxShards)));
2211
2212        // Adding 1 shard should still work (id = MAX - 1).
2213        let result = mapper.scale_up(1);
2214        assert!(result.is_ok());
2215    }
2216
2217    #[test]
2218    fn test_evaluate_scaling_auto_scale_disabled() {
2219        let policy = ScalingPolicy {
2220            auto_scale: false,
2221            ..Default::default()
2222        };
2223        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2224
2225        let decision = mapper.evaluate_scaling();
2226        assert!(matches!(decision, ScalingDecision::None));
2227    }
2228
2229    #[test]
2230    fn test_evaluate_scaling_in_cooldown() {
2231        let policy = ScalingPolicy {
2232            cooldown: Duration::from_secs(60),
2233            ..Default::default()
2234        };
2235        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2236
2237        // Trigger a scaling operation to start cooldown
2238        *mapper.last_scaling.write() = Some(Instant::now());
2239
2240        let decision = mapper.evaluate_scaling();
2241        assert!(matches!(decision, ScalingDecision::None));
2242    }
2243
2244    #[test]
2245    fn test_evaluate_scaling_scale_up_on_high_fill_ratio() {
2246        let policy = ScalingPolicy {
2247            fill_ratio_threshold: 0.7,
2248            max_shards: 8,
2249            cooldown: Duration::from_nanos(1),
2250            ..Default::default()
2251        };
2252        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2253
2254        // Set high fill ratio on majority of shards
2255        {
2256            let mut shards = mapper.shards.write();
2257            for shard in shards.iter_mut() {
2258                shard.last_metrics.fill_ratio = 0.9; // Above threshold
2259            }
2260        }
2261
2262        let decision = mapper.evaluate_scaling();
2263        assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2264    }
2265
2266    #[test]
2267    fn test_evaluate_scaling_scale_up_on_high_latency() {
2268        let policy = ScalingPolicy {
2269            push_latency_threshold_ns: 10,
2270            max_shards: 8,
2271            cooldown: Duration::from_nanos(1),
2272            ..Default::default()
2273        };
2274        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2275
2276        // Set high push latency on majority of shards
2277        {
2278            let mut shards = mapper.shards.write();
2279            for shard in shards.iter_mut() {
2280                shard.last_metrics.avg_push_latency_ns = 100; // Above threshold
2281            }
2282        }
2283
2284        let decision = mapper.evaluate_scaling();
2285        assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2286    }
2287
2288    #[test]
2289    fn test_evaluate_scaling_scale_down_on_underutilized() {
2290        let policy = ScalingPolicy {
2291            underutilized_threshold: 0.2,
2292            min_shards: 2,
2293            cooldown: Duration::from_nanos(1),
2294            ..Default::default()
2295        };
2296        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2297
2298        // Set low fill ratio and zero event rate on majority of shards
2299        {
2300            let mut shards = mapper.shards.write();
2301            for shard in shards.iter_mut() {
2302                shard.last_metrics.fill_ratio = 0.05; // Below threshold
2303                shard.last_metrics.event_rate = 0;
2304            }
2305        }
2306
2307        let decision = mapper.evaluate_scaling();
2308        assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
2309    }
2310
2311    /// Regression: a freshly-activated shard must NOT
2312    /// immediately count toward the underutilized tally on the
2313    /// next `evaluate_scaling`. Pre-fix the new shard's
2314    /// `last_metrics` was the `ShardMetrics::new(id)` placeholder
2315    /// (`fill_ratio = 0.0, event_rate = 0`), which matched the
2316    /// underutilized trigger immediately — oscillating the
2317    /// system: scale-up → next tick scale-down → next tick
2318    /// scale-up.
2319    ///
2320    /// Post-fix `MappedShard.activated_at` is stamped at the
2321    /// Provisioning → Active transition (and at scale-up
2322    /// construction for `Active`-from-create shards), and
2323    /// `evaluate_scaling` skips shards within `policy.cooldown`
2324    /// of activation.
2325    #[test]
2326    fn freshly_added_shard_skipped_from_evaluate_scaling_warmup() {
2327        // Short cooldown so we can pin "outside warmup" with a
2328        // small `checked_sub`-safe offset (200ms). Production
2329        // stamps boot shards 1 hour in the past, but that
2330        // subtraction falls back to `Instant::now()` on hosts
2331        // with sub-1h uptime (Windows Instant is bounded by
2332        // boot), making all shards land inside the warmup
2333        // window. We override both boot shards explicitly so
2334        // the test is independent of system uptime.
2335        let policy = ScalingPolicy {
2336            underutilized_threshold: 0.2,
2337            min_shards: 1,
2338            cooldown: Duration::from_millis(100),
2339            ..Default::default()
2340        };
2341        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2342
2343        // Direct manipulation of the shard list: pin two boot
2344        // shards as underutilized AND safely outside the warmup
2345        // window, and inject one freshly-activated shard with
2346        // `activated_at = now()` whose placeholder metrics ALSO
2347        // trigger the underutilized predicate. Without the
2348        // warmup skip, the count would be 3 of 3 shards
2349        // underutilized → scale-down. WITH the warmup skip,
2350        // only the 2 boot shards count, and 2 of 3 still
2351        // exceeds the 3/2 = 1 majority — scale-down still fires
2352        // (driven by the boot shards), but the decisive property
2353        // is that the fresh shard is correctly excluded.
2354        let old_ts = Instant::now()
2355            .checked_sub(Duration::from_millis(200))
2356            .expect("test host uptime should exceed 200ms");
2357        {
2358            let mut shards = mapper.shards.write();
2359            for shard in shards.iter_mut() {
2360                shard.last_metrics.fill_ratio = 0.05;
2361                shard.last_metrics.event_rate = 0;
2362                // Pin boot shards safely outside the cooldown,
2363                // independent of production's 1-hour stamp.
2364                shard.activated_at = old_ts;
2365            }
2366            // The third shard is "fresh": stamp activated_at to
2367            // now, simulating a just-activated shard.
2368            shards[2].activated_at = Instant::now();
2369        }
2370
2371        // The fresh shard must satisfy the warmup predicate.
2372        let now = Instant::now();
2373        let warmup_excluded: Vec<u16> = mapper
2374            .shards
2375            .read()
2376            .iter()
2377            .filter(|s| now.duration_since(s.activated_at) < mapper.policy.cooldown)
2378            .map(|s| s.id)
2379            .collect();
2380        assert_eq!(
2381            warmup_excluded,
2382            vec![2u16],
2383            "regression: only shard id 2 (just-stamped) should be \
2384             within the warmup window; boot shards are stamped \
2385             200ms in the past (well outside the 100ms cooldown)"
2386        );
2387
2388        // And evaluate_scaling must still produce ScaleDown
2389        // (driven by the 2 boot shards) — the fresh shard's
2390        // placeholder doesn't get to vote.
2391        let decision = mapper.evaluate_scaling();
2392        assert!(
2393            matches!(decision, ScalingDecision::ScaleDown(_)),
2394            "scale-down still fires from the boot shards' real \
2395             underutilization, but driven by 2 of 3 not 3 of 3 — \
2396             got {:?}",
2397            decision,
2398        );
2399    }
2400
2401    #[test]
2402    fn test_evaluate_scaling_no_scale_up_at_max() {
2403        let policy = ScalingPolicy {
2404            fill_ratio_threshold: 0.7,
2405            max_shards: 4, // Already at max
2406            cooldown: Duration::from_nanos(1),
2407            ..Default::default()
2408        };
2409        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2410
2411        // Set high fill ratio
2412        {
2413            let mut shards = mapper.shards.write();
2414            for shard in shards.iter_mut() {
2415                shard.last_metrics.fill_ratio = 0.9;
2416            }
2417        }
2418
2419        let decision = mapper.evaluate_scaling();
2420        assert!(matches!(decision, ScalingDecision::None));
2421    }
2422
2423    #[test]
2424    fn test_evaluate_scaling_no_scale_down_at_min() {
2425        let policy = ScalingPolicy {
2426            underutilized_threshold: 0.2,
2427            min_shards: 4, // Already at min
2428            cooldown: Duration::from_nanos(1),
2429            ..Default::default()
2430        };
2431        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2432
2433        // Set underutilized metrics
2434        {
2435            let mut shards = mapper.shards.write();
2436            for shard in shards.iter_mut() {
2437                shard.last_metrics.fill_ratio = 0.05;
2438                shard.last_metrics.event_rate = 0;
2439            }
2440        }
2441
2442        let decision = mapper.evaluate_scaling();
2443        assert!(matches!(decision, ScalingDecision::None));
2444    }
2445
2446    #[test]
2447    fn test_evaluate_scaling_ignores_draining_shards() {
2448        let policy = ScalingPolicy {
2449            fill_ratio_threshold: 0.7,
2450            max_shards: 8,
2451            cooldown: Duration::from_nanos(1),
2452            ..Default::default()
2453        };
2454        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2455
2456        // Set high fill ratio but mark shards as draining
2457        {
2458            let mut shards = mapper.shards.write();
2459            for shard in shards.iter_mut() {
2460                shard.last_metrics.fill_ratio = 0.9;
2461                shard.state = ShardState::Draining;
2462            }
2463        }
2464
2465        // Should not scale up since draining shards are ignored
2466        let decision = mapper.evaluate_scaling();
2467        assert!(matches!(decision, ScalingDecision::None));
2468    }
2469
2470    #[test]
2471    fn test_shard_metrics_new() {
2472        let metrics = ShardMetrics::new(5);
2473        assert_eq!(metrics.shard_id, 5);
2474        assert_eq!(metrics.fill_ratio, 0.0);
2475        assert_eq!(metrics.event_rate, 0);
2476        assert!(!metrics.draining);
2477    }
2478
2479    #[test]
2480    fn test_shard_metrics_compute_weight() {
2481        let mut metrics = ShardMetrics::new(0);
2482        metrics.fill_ratio = 0.5;
2483        metrics.avg_push_latency_ns = 100;
2484        metrics.event_rate = 1_000_000;
2485
2486        metrics.compute_weight();
2487        assert!(metrics.weight > 0.0);
2488    }
2489
2490    #[test]
2491    fn test_shard_metrics_draining_max_weight() {
2492        let mut metrics = ShardMetrics::new(0);
2493        metrics.draining = true;
2494        metrics.compute_weight();
2495        assert_eq!(metrics.weight, f64::MAX);
2496    }
2497
2498    #[test]
2499    fn test_scaling_decision_debug() {
2500        let none = ScalingDecision::None;
2501        let up = ScalingDecision::ScaleUp(2);
2502        let down = ScalingDecision::ScaleDown(1);
2503
2504        assert!(format!("{:?}", none).contains("None"));
2505        assert!(format!("{:?}", up).contains("ScaleUp"));
2506        assert!(format!("{:?}", down).contains("ScaleDown"));
2507    }
2508
2509    /// Regression: BUG_REPORT.md #46 — `scale_up_provisioning`
2510    /// allocates a shard but `select_shard` must NOT route to it
2511    /// until `activate` has been called. This is the load-bearing
2512    /// invariant that lets `EventBus::add_shard_internal` wire up
2513    /// drain workers before producers can land in the new ring
2514    /// buffer.
2515    #[test]
2516    fn provisioning_shard_is_not_selectable_until_activated() {
2517        let policy = ScalingPolicy {
2518            min_shards: 1,
2519            max_shards: 16,
2520            cooldown: Duration::from_nanos(1),
2521            ..Default::default()
2522        };
2523        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2524
2525        // Allocate a provisioning shard. Existing ids are 0,1; new
2526        // is 2.
2527        let new_ids = mapper.scale_up_provisioning(1).unwrap();
2528        assert_eq!(new_ids, vec![2]);
2529
2530        // The provisioning shard must NOT appear in active accounting.
2531        assert_eq!(mapper.active_shard_count(), 2);
2532        assert_eq!(mapper.shard_state(2), Some(ShardState::Provisioning));
2533
2534        // Across many hashes, `select_shard` must never pick id 2.
2535        // Spread the input hashes across the u64 range so the
2536        // unbiased Lemire-style mapping actually
2537        // distributes — sequential small ids would all map to
2538        // index 0 because `(small * len) >> 64 = 0`. Production
2539        // callers pass `xxh3_64`-hashed event payloads, which
2540        // are uniform u64s; this scaling mirrors that.
2541        let mut seen: std::collections::HashSet<u16> = std::collections::HashSet::new();
2542        let stride = u64::MAX / 10_000;
2543        for i in 0u64..10_000 {
2544            seen.insert(mapper.select_shard(i.wrapping_mul(stride)));
2545        }
2546        assert!(
2547            !seen.contains(&2),
2548            "provisioning shard 2 was selected — \
2549             producers would push into a ring buffer with no consumer (#46)"
2550        );
2551        assert!(seen == [0, 1].into_iter().collect());
2552
2553        // After `activate`, the shard is selectable.
2554        mapper.activate(2).unwrap();
2555        assert_eq!(mapper.shard_state(2), Some(ShardState::Active));
2556        assert_eq!(mapper.active_shard_count(), 3);
2557
2558        let mut seen_after: std::collections::HashSet<u16> = std::collections::HashSet::new();
2559        let stride = u64::MAX / 10_000;
2560        for i in 0u64..10_000 {
2561            seen_after.insert(mapper.select_shard(i.wrapping_mul(stride)));
2562        }
2563        assert!(
2564            seen_after.contains(&2),
2565            "after activate, shard 2 should be a valid select_shard target"
2566        );
2567    }
2568
2569    /// Regression: BUG_REPORT.md #51 — when no `Active` shard exists
2570    /// (e.g. all shards are Draining or Provisioning), `select_shard`
2571    /// must NOT fall back to a Draining shard. Pushing into a
2572    /// draining shard increments `pushes_since_drain_start` and
2573    /// blocks finalization indefinitely.
2574    #[test]
2575    fn select_shard_does_not_fall_back_to_draining() {
2576        let policy = ScalingPolicy {
2577            min_shards: 0,
2578            max_shards: 8,
2579            cooldown: Duration::from_nanos(1),
2580            ..Default::default()
2581        };
2582        // Force min_shards = 0 via direct construction so we can drain
2583        // every shard.
2584        let mut policy = policy;
2585        policy.min_shards = 1;
2586        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2587
2588        // Force every shard into Draining state directly. We bypass
2589        // `scale_down`'s min_shards floor by mutating the test-only
2590        // accessor; this models a state that can otherwise be
2591        // reached by `drain_specific` calls.
2592        //
2593        // Backdoor mutations don't trigger the `select_shard`
2594        // selection-table rebuild that the public API does, so we
2595        // call it explicitly here to model what `drain_specific`
2596        // would have done.
2597        {
2598            let mut shards = mapper.shards.write();
2599            for s in shards.iter_mut() {
2600                s.state = ShardState::Draining;
2601                s.metrics.set_draining(true);
2602            }
2603            mapper.rebuild_selection_table_locked(&shards);
2604        }
2605
2606        // The fallback must return the OOB sentinel `u16::MAX` rather
2607        // than a draining shard id (0 or 1). The upstream
2608        // `resolve_idx` path will see no match for `u16::MAX` and
2609        // surface as `Unrouted` (#44), which is the correct signal:
2610        // "no destination, do not push."
2611        for hash in 0u64..1_000 {
2612            let picked = mapper.select_shard(hash);
2613            assert_eq!(
2614                picked,
2615                u16::MAX,
2616                "fallback returned a draining shard ({}); pushes there would \
2617                 deadlock finalize_draining (#51)",
2618                picked
2619            );
2620        }
2621    }
2622
2623    /// Regression: BUG_REPORT.md #32 — `scale_up(0)` previously
2624    /// bumped the cooldown timestamp and could spuriously fail at
2625    /// `u16::MAX` even though zero ids were being allocated.
2626    #[test]
2627    fn scale_up_zero_is_a_noop() {
2628        let policy = ScalingPolicy {
2629            cooldown: Duration::from_secs(60),
2630            ..Default::default()
2631        };
2632        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2633        // Pretend we just scaled — cooldown is active.
2634        *mapper.last_scaling.write() = Some(Instant::now());
2635
2636        // scale_up(0) must not return InCooldown and must not bump
2637        // last_scaling.
2638        let before_ts = *mapper.last_scaling.read();
2639        let r = mapper.scale_up(0);
2640        assert!(
2641            r.is_ok(),
2642            "scale_up(0) should succeed as a no-op, got {r:?}"
2643        );
2644        assert_eq!(r.unwrap().len(), 0);
2645        let after_ts = *mapper.last_scaling.read();
2646        assert_eq!(before_ts, after_ts, "scale_up(0) bumped cooldown timestamp");
2647
2648        // Also: position the allocator at u16::MAX and verify a
2649        // count==0 call doesn't trip the sentinel check.
2650        mapper
2651            .next_shard_id
2652            .store(u16::MAX, AtomicOrdering::Relaxed);
2653        assert!(mapper.scale_up(0).is_ok());
2654    }
2655
2656    /// Regression: `drain_specific` must bump `last_scaling` so
2657    /// a subsequent `scale_up` is gated by the cooldown floor.
2658    /// Pre-fix `scale_down` wrote `last_scaling` but
2659    /// `drain_specific` did not, so the sequence
2660    /// `drain_specific(id) → scale_up(N)` bypassed the cooldown
2661    /// — even though both decrement `active_count` and so
2662    /// should be symmetric from the budget-math perspective.
2663    #[test]
2664    fn drain_specific_bumps_last_scaling_so_scale_up_respects_cooldown() {
2665        let policy = ScalingPolicy {
2666            min_shards: 1,
2667            max_shards: 8,
2668            // A cooldown long enough that `Instant::now()` won't
2669            // accidentally elapse it during the test, but short
2670            // enough not to slow CI.
2671            cooldown: Duration::from_secs(60),
2672            ..Default::default()
2673        };
2674        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2675
2676        // Pre-condition: no prior scaling action.
2677        assert!(mapper.last_scaling.read().is_none());
2678
2679        let before = Instant::now();
2680        mapper.drain_specific(0).unwrap();
2681        let after_ts = mapper
2682            .last_scaling
2683            .read()
2684            .expect("drain_specific must record a `last_scaling` timestamp");
2685        // Sanity: the recorded timestamp is in the test window.
2686        assert!(
2687            after_ts >= before,
2688            "last_scaling must be bumped to a current Instant (got {:?}, before was {:?})",
2689            after_ts,
2690            before
2691        );
2692
2693        // The decisive sealed property: a follow-up scale_up
2694        // must trip the cooldown gate. Pre-fix this would have
2695        // succeeded immediately because `last_scaling` was never
2696        // written by `drain_specific`.
2697        let err = mapper
2698            .scale_up(1)
2699            .expect_err("scale_up immediately after drain_specific must hit cooldown");
2700        match err {
2701            ScalingError::InCooldown => {} // expected
2702            other => panic!("expected InCooldown after drain_specific, got {:?}", other),
2703        }
2704    }
2705
2706    /// Regression: BUG_REPORT.md #48 — `drain_specific` must
2707    /// transition the shard's `MappedShard.state` to `Draining` so
2708    /// that `select_shard` stops routing to it. The previous
2709    /// `drain_shard` only flipped a metrics atomic.
2710    #[test]
2711    fn drain_specific_takes_shard_out_of_select() {
2712        let policy = ScalingPolicy {
2713            min_shards: 1,
2714            max_shards: 8,
2715            cooldown: Duration::from_nanos(1),
2716            ..Default::default()
2717        };
2718        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2719
2720        // Drain shard 0.
2721        mapper.drain_specific(0).unwrap();
2722        assert_eq!(mapper.shard_state(0), Some(ShardState::Draining));
2723        assert_eq!(mapper.active_shard_count(), 2);
2724
2725        // Across many hashes, select_shard must not pick id 0.
2726        for hash in 0u64..10_000 {
2727            let picked = mapper.select_shard(hash);
2728            assert_ne!(
2729                picked, 0,
2730                "select_shard returned a Draining shard id 0 — \
2731                 producer pushes there would block finalize_draining (#48)"
2732            );
2733        }
2734    }
2735
2736    /// Regression: BUG_REPORT.md #33 — `set_draining(true)` and a
2737    /// concurrent `record_push` race on `pushes_since_drain_start`.
2738    /// The race itself can't be eliminated without a CAS loop on
2739    /// the counter (which would penalize the hot path), so the
2740    /// best we can pin is: after the dust settles, the counter
2741    /// value is bounded — never larger than the number of pushes
2742    /// that genuinely overlapped the transition. This catches a
2743    /// regression where the store-zero stops happening, where the
2744    /// flag publish stops happening, or where future code adds
2745    /// drift that compounds the race across many transitions.
2746    #[test]
2747    fn set_draining_resets_counter_under_concurrent_pushes() {
2748        use std::sync::Barrier;
2749        use std::thread;
2750
2751        const ITERATIONS: usize = 200;
2752        const PUSHERS: usize = 4;
2753
2754        for _ in 0..ITERATIONS {
2755            let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
2756
2757            // Sanity: counter starts at zero.
2758            assert_eq!(collector.pushes_since_drain_start(), 0);
2759
2760            // Pre-load with a noticeable amount of "before drain"
2761            // pushes so the reset has work to do.
2762            for _ in 0..50 {
2763                collector.record_push(1);
2764            }
2765            assert_eq!(collector.pushes_since_drain_start(), 50);
2766
2767            // Race: every pusher hammers `record_push` while one
2768            // thread calls `set_draining(true)`. After the barrier,
2769            // we want to observe that the counter ends up bounded
2770            // by PUSHERS (the number of pushes that genuinely
2771            // overlapped the transition) — and crucially NOT 50+,
2772            // which is what the buggy code with a missing reset
2773            // would leave behind.
2774            let barrier = Arc::new(Barrier::new(PUSHERS + 1));
2775            let mut handles = Vec::with_capacity(PUSHERS);
2776            for _ in 0..PUSHERS {
2777                let c = Arc::clone(&collector);
2778                let b = Arc::clone(&barrier);
2779                handles.push(thread::spawn(move || {
2780                    b.wait();
2781                    c.record_push(1);
2782                }));
2783            }
2784
2785            barrier.wait();
2786            collector.set_draining(true);
2787            for h in handles {
2788                h.join().unwrap();
2789            }
2790
2791            // After reset + at-most-PUSHERS racing pushes, the
2792            // counter is bounded. The strong guarantee we want is
2793            // "the 50 pre-drain pushes did NOT survive the reset."
2794            // Anything <= PUSHERS is acceptable — the race may
2795            // count any subset of the racing pushes.
2796            let final_count = collector.pushes_since_drain_start();
2797            assert!(
2798                final_count <= PUSHERS as u64,
2799                "set_draining reset is broken: counter is {} after reset, \
2800                 expected at most {} (#33)",
2801                final_count,
2802                PUSHERS
2803            );
2804        }
2805    }
2806
2807    /// Regression: BUG_REPORT.md #49 — `finalize_draining` must
2808    /// drop the `shards` write lock before calling `on_shard_removed`,
2809    /// so a callback that re-enters the mapper (read methods like
2810    /// `shard_state`, `active_shard_ids`, etc.) does not deadlock.
2811    #[test]
2812    fn finalize_draining_does_not_deadlock_on_callback_reentry() {
2813        use std::sync::Mutex;
2814        let policy = ScalingPolicy {
2815            min_shards: 1,
2816            max_shards: 8,
2817            cooldown: Duration::from_nanos(1),
2818            ..Default::default()
2819        };
2820        let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2821
2822        // Set a callback that re-enters the mapper. Before the fix
2823        // this acquires `shards.read()` while finalize_draining
2824        // still holds `shards.write()`, deadlocking on parking_lot's
2825        // non-recursive RwLock.
2826        type Observation = (u16, Option<ShardState>);
2827        let observed_states: Arc<Mutex<Vec<Observation>>> = Arc::new(Mutex::new(Vec::new()));
2828        {
2829            let mapper_for_cb = Arc::clone(&mapper);
2830            let observed = Arc::clone(&observed_states);
2831            mapper.set_on_shard_removed(move |id| {
2832                let st = mapper_for_cb.shard_state(id);
2833                observed.lock().unwrap().push((id, st));
2834            });
2835        }
2836
2837        // Drive a shard all the way to Stopped: drain it, mark its
2838        // metrics empty (current_len = 0), and drop drain_started
2839        // far enough back that the 100ms gate is satisfied.
2840        mapper.drain_specific(0).unwrap();
2841        {
2842            let mut shards = mapper.shards.write();
2843            let s = shards.iter_mut().find(|s| s.id == 0).unwrap();
2844            s.metrics.current_len.store(0, AtomicOrdering::Relaxed);
2845            s.metrics
2846                .pushes_since_drain_start
2847                .store(0, AtomicOrdering::Relaxed);
2848            // Backdate drain_started so the elapsed > 100ms gate trips.
2849            s.drain_started = Some(Instant::now() - Duration::from_secs(1));
2850        }
2851
2852        // The call below would deadlock with the bug present.
2853        let stopped = mapper.finalize_draining();
2854        assert_eq!(stopped, vec![0]);
2855
2856        // The callback must have run AND been able to read state
2857        // (proving the lock was released).
2858        let observed = observed_states.lock().unwrap().clone();
2859        assert_eq!(observed.len(), 1);
2860        assert_eq!(observed[0].0, 0);
2861        // State should be `Stopped` (set by finalize_draining before
2862        // the lock was dropped).
2863        assert_eq!(observed[0].1, Some(ShardState::Stopped));
2864    }
2865
2866    /// `activate` must signal whether a state transition
2867    /// actually occurred so callers can avoid double-counting.
2868    /// Pre-fix it returned `Result<(), _>`, so a caller that
2869    /// invoked `activate` twice on the same shard incremented its
2870    /// own external counter (e.g. `ShardManager::num_shards`)
2871    /// twice for one logical transition.
2872    #[test]
2873    fn activate_returns_true_on_transition_and_false_on_idempotent_call() {
2874        let policy = ScalingPolicy {
2875            min_shards: 1,
2876            max_shards: 16,
2877            cooldown: Duration::from_nanos(1),
2878            ..Default::default()
2879        };
2880        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2881
2882        // Newly-provisioned shard 2 → Active: real transition.
2883        let new_ids = mapper.scale_up_provisioning(1).unwrap();
2884        assert_eq!(new_ids, vec![2]);
2885        let first = mapper.activate(2).unwrap();
2886        assert!(
2887            first,
2888            "first activate on a Provisioning shard must return true"
2889        );
2890        assert_eq!(mapper.active_shard_count(), 3);
2891
2892        // Second activate on the same already-Active shard:
2893        // idempotent, no transition.
2894        let second = mapper.activate(2).unwrap();
2895        assert!(
2896            !second,
2897            "activate on an already-Active shard must return false; \
2898             pre-fix this returned Ok(()) and the caller couldn't tell"
2899        );
2900        // active_count must NOT have moved.
2901        assert_eq!(
2902            mapper.active_shard_count(),
2903            3,
2904            "idempotent activate must not bump active_count"
2905        );
2906    }
2907}