Skip to main content

net/shard/
mapper.rs

1//! Dynamic shard mapping and scaling.
2//!
3//! This module implements dynamic shard scaling following Approach B from
4//! DYNAMIC_SHARD_SCALING.md: increasing the number of shards and rebalancing
5//! producers across them, while maintaining SPSC (single-producer single-consumer)
6//! semantics per shard.
7//!
8//! # Core Principles
9//!
10//! - Shards never accept multiple producers
11//! - Producers get moved to new shards when load increases
12//! - Ingestion performance stays at 700M ops/sec per shard
13//! - Ordering guarantees remain intact within a shard
14//! - Total throughput scales linearly with shard count
15//!
16//! # Scaling Triggers
17//!
18//! - Ring buffer fill ratio > threshold (default 70%)
19//! - Push latency exceeds threshold (default 5ns)
20//! - Batch flush latency exceeds threshold
21//! - Session/producer count growth
22//!
23//! # Architecture
24//!
25//! ```text
26//! Producers -----+
27//!                |
28//!                v
29//!     +----------------------------+
30//!     |   Dynamic Shard Mapper     |
31//!     +----------------------------+
32//!        |         |         |
33//!        v         v         v
34//!     Shard 0   Shard 1   Shard 2 …
35//! ```
36
37use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering as AtomicOrdering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use parking_lot::RwLock;
42
43use crate::config::ScalingPolicy;
44
45/// Metrics for a single shard used for scaling decisions.
46#[derive(Debug, Clone)]
47pub struct ShardMetrics {
48    /// Shard identifier.
49    pub shard_id: u16,
50    /// Current fill ratio (0.0 - 1.0).
51    pub fill_ratio: f64,
52    /// Events ingested in the current window.
53    pub event_rate: u64,
54    /// Average push latency in nanoseconds.
55    pub avg_push_latency_ns: u64,
56    /// Average batch flush latency in microseconds.
57    pub avg_flush_latency_us: u64,
58    /// Whether this shard is in drain mode.
59    pub draining: bool,
60    /// Computed weight for load balancing (lower = less loaded).
61    pub weight: f64,
62    /// Last update timestamp.
63    pub last_updated: Instant,
64}
65
66impl ShardMetrics {
67    /// Create new metrics for a shard.
68    pub fn new(shard_id: u16) -> Self {
69        Self {
70            shard_id,
71            fill_ratio: 0.0,
72            event_rate: 0,
73            avg_push_latency_ns: 0,
74            avg_flush_latency_us: 0,
75            draining: false,
76            weight: 0.0,
77            last_updated: Instant::now(),
78        }
79    }
80
81    /// Compute the weight based on current metrics.
82    /// Lower weight = better candidate for new producers.
83    pub fn compute_weight(&mut self) {
84        // Weight formula: combines fill ratio, latency, and event rate
85        // Higher fill ratio = higher weight (avoid overloaded shards)
86        // Higher latency = higher weight
87        // Higher event rate = higher weight
88        let fill_weight = self.fill_ratio * 100.0;
89        let latency_weight = (self.avg_push_latency_ns as f64) / 10.0;
90        let rate_weight = (self.event_rate as f64) / 1_000_000.0;
91
92        self.weight = fill_weight + latency_weight + rate_weight;
93
94        // Draining shards get maximum weight (never assign new producers)
95        if self.draining {
96            self.weight = f64::MAX;
97        }
98    }
99}
100
101/// Live metrics collector for a shard (atomics for lock-free updates).
102#[derive(Debug)]
103pub struct ShardMetricsCollector {
104    /// Shard identifier.
105    shard_id: u16,
106    /// Ring buffer capacity.
107    capacity: usize,
108    /// Current buffer length (updated by shard).
109    current_len: AtomicU64,
110    /// Events ingested in current window.
111    events_in_window: AtomicU64,
112    /// Packed `(count << 32) | sum` for push latencies (ns).
113    /// Pre-fix `push_latency_sum_ns` and `push_count` were
114    /// independent `AtomicU64`s. `record_push` did two separate
115    /// `fetch_add`s, and `collect_and_reset` did two separate
116    /// `swap`s. A metrics tick interleaving between the two
117    /// `fetch_add`s captured the sum WITHOUT the count (or
118    /// vice versa); the resulting `avg = sum.checked_div(count)
119    /// .unwrap_or(0)` returned 0 in window N (sum without
120    /// count) and 0 in window N+1 (count without sum), silently
121    /// zeroing the average that drives `evaluate_scaling`'s
122    /// push-latency scale-up trigger. Packing into one u64 makes
123    /// the `(sum, count)` update atomic; the upper 32 bits hold
124    /// the count (u32::MAX = 4G calls/window — plenty) and the
125    /// lower 32 hold the sum (u32::MAX = 4 G ns ≈ 4 s, also
126    /// plenty for any sane window).
127    push_latency: AtomicU64,
128    /// Packed `(count << 32) | sum` for flush latencies (us).
129    /// Same shape and rationale as `push_latency`. The lower 32
130    /// bits hold sum-µs (u32::MAX ≈ 4 Gµs ≈ 67 minutes — far
131    /// past any plausible window).
132    flush_latency: AtomicU64,
133    /// Whether this shard is draining.
134    draining: AtomicBool,
135    /// Window start time.
136    window_start: RwLock<Instant>,
137    /// Pushes observed since `set_draining(true)` was last called.
138    /// Distinct from `events_in_window` because this counter is NOT
139    /// reset by `collect_and_reset`. `finalize_draining` reads this
140    /// instead of `events_in_window` so a drain-window-overlap with
141    /// a metrics tick can no longer race the counter to zero before
142    /// the producer is observed.
143    pushes_since_drain_start: AtomicU64,
144}
145
146impl ShardMetricsCollector {
147    /// Create a new metrics collector.
148    pub fn new(shard_id: u16, capacity: usize) -> Self {
149        Self {
150            shard_id,
151            capacity,
152            current_len: AtomicU64::new(0),
153            events_in_window: AtomicU64::new(0),
154            push_latency: AtomicU64::new(0),
155            flush_latency: AtomicU64::new(0),
156            draining: AtomicBool::new(false),
157            window_start: RwLock::new(Instant::now()),
158            pushes_since_drain_start: AtomicU64::new(0),
159        }
160    }
161
162    /// Record current buffer length.
163    #[inline]
164    pub fn record_buffer_len(&self, len: usize) {
165        self.current_len.store(len as u64, AtomicOrdering::Relaxed);
166    }
167
168    /// Record an event ingestion.
169    #[inline]
170    pub fn record_push(&self, latency_ns: u64) {
171        self.events_in_window.fetch_add(1, AtomicOrdering::Relaxed);
172        // Atomically add 1 to count (upper 32 bits) and
173        // `latency_ns` to sum (lower 32 bits). `fetch_update`
174        // CAS-loops the load-and-store, so a concurrent
175        // `collect_and_reset` swap on the same word either sees
176        // both pre-add or both post-add — no `(sum, count)`
177        // desync. Saturating ops cap at u32::MAX inside the
178        // pack window (~4 G calls / 4 s of accumulated latency),
179        // which is far beyond any sane metrics tick.
180        let _ =
181            self.push_latency
182                .fetch_update(AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |v| {
183                    let count = (v >> 32) as u32;
184                    let sum = (v & 0xFFFF_FFFF) as u32;
185                    let new_count = count.saturating_add(1) as u64;
186                    let new_sum = sum.saturating_add(latency_ns.min(u32::MAX as u64) as u32) as u64;
187                    Some((new_count << 32) | new_sum)
188                });
189        // Always increment — the cost is one fetch_add and the
190        // counter only matters when the shard is draining. Cheaper
191        // than branching on `self.draining.load()` in the hot path.
192        self.pushes_since_drain_start
193            .fetch_add(1, AtomicOrdering::Relaxed);
194    }
195
196    /// Record a batch flush.
197    #[inline]
198    pub fn record_flush(&self, latency_us: u64) {
199        // Same packed-`(count, sum)` shape as `record_push` —
200        // see that function for the desync rationale.
201        let _ = self.flush_latency.fetch_update(
202            AtomicOrdering::Relaxed,
203            AtomicOrdering::Relaxed,
204            |v| {
205                let count = (v >> 32) as u32;
206                let sum = (v & 0xFFFF_FFFF) as u32;
207                let new_count = count.saturating_add(1) as u64;
208                let new_sum = sum.saturating_add(latency_us.min(u32::MAX as u64) as u32) as u64;
209                Some((new_count << 32) | new_sum)
210            },
211        );
212    }
213
214    /// Set drain mode.
215    ///
216    /// Transitions to draining reset `pushes_since_drain_start` so
217    /// `finalize_draining` only counts pushes that arrived after the
218    /// drain began.
219    ///
220    /// A concurrent `record_push` can interleave with the store-zero
221    /// on `pushes_since_drain_start` and leave the counter at `1`
222    /// rather than `0`. That's not a correctness bug — it just defers
223    /// finalization of this shard by one metrics tick — but the
224    /// previous code did the store-zero on the counter *before*
225    /// publishing the `draining=true` flag, which made the window
226    /// slightly larger than necessary. Publishing the flag first
227    /// means any push that *observes* `draining=true` is naturally
228    /// sequenced after the reset; pushes that beat the flag publish
229    /// race the reset just like before.
230    ///
231    /// We also use `SeqCst` for the publish to give the rest of the
232    /// crate a single total order on draining transitions, which
233    /// matches the ordering on `try_enter_ingest`'s shutdown flag.
234    pub fn set_draining(&self, draining: bool) {
235        if draining {
236            // Store-zero first (so the flag publish below acts as
237            // the release-fence for both writes).
238            self.pushes_since_drain_start
239                .store(0, AtomicOrdering::SeqCst);
240        }
241        self.draining.store(draining, AtomicOrdering::SeqCst);
242    }
243
244    /// Number of pushes observed since `set_draining(true)` was
245    /// last called. Used by `finalize_draining` to detect lingering
246    /// producers that the window-reset `events_in_window` counter
247    /// can race past.
248    ///
249    /// Pre-fix used `Ordering::Relaxed`, but the writer
250    /// side (`set_draining(true)`) resets the counter under
251    /// `SeqCst`. On weakly-ordered hardware (ARM), a Relaxed
252    /// reader could observe a stale counter and `finalize_draining`
253    /// would falsely conclude the drain had flushed while
254    /// producers were still pushing. Acquire pairs with the
255    /// SeqCst release of the reset (SeqCst includes Release
256    /// semantics), making the reset happen-before this load.
257    pub fn pushes_since_drain_start(&self) -> u64 {
258        self.pushes_since_drain_start.load(AtomicOrdering::Acquire)
259    }
260
261    /// Check if draining.
262    pub fn is_draining(&self) -> bool {
263        self.draining.load(AtomicOrdering::Acquire)
264    }
265
266    /// Collect metrics and reset window counters.
267    ///
268    /// NOTE: The individual atomic swaps below are not collectively atomic with
269    /// respect to concurrent `record_push`/`record_flush` calls. This means a
270    /// push recorded between, say, the `events_in_window` swap and the
271    /// `push_count` swap could be counted in one counter but not the other for
272    /// a given window. This small inaccuracy is an accepted trade-off to
273    /// preserve the lock-free design of the hot path (`record_push` /
274    /// `record_flush`). Adding a `Mutex` here would serialize the hot path
275    /// and defeat the purpose of using atomics.
276    pub fn collect_and_reset(&self) -> ShardMetrics {
277        let current_len = self.current_len.load(AtomicOrdering::Relaxed);
278        let events = self.events_in_window.swap(0, AtomicOrdering::Relaxed);
279        // Single swap captures `(count, sum)` together; no
280        // chance of catching the sum without the matching count
281        // (or vice versa) the way two independent swaps did.
282        let push_packed = self.push_latency.swap(0, AtomicOrdering::Relaxed);
283        let push_count = push_packed >> 32;
284        let push_latency_sum = push_packed & 0xFFFF_FFFF;
285        let flush_packed = self.flush_latency.swap(0, AtomicOrdering::Relaxed);
286        let flush_count = flush_packed >> 32;
287        let flush_latency_sum = flush_packed & 0xFFFF_FFFF;
288
289        let fill_ratio = if self.capacity > 0 {
290            current_len as f64 / self.capacity as f64
291        } else {
292            0.0
293        };
294
295        let avg_push_latency = push_latency_sum.checked_div(push_count).unwrap_or(0);
296
297        let avg_flush_latency = flush_latency_sum.checked_div(flush_count).unwrap_or(0);
298
299        // Reset window
300        *self.window_start.write() = Instant::now();
301
302        let mut metrics = ShardMetrics {
303            shard_id: self.shard_id,
304            fill_ratio,
305            event_rate: events,
306            avg_push_latency_ns: avg_push_latency,
307            avg_flush_latency_us: avg_flush_latency,
308            draining: self.draining.load(AtomicOrdering::Acquire),
309            weight: 0.0,
310            last_updated: Instant::now(),
311        };
312        metrics.compute_weight();
313        metrics
314    }
315}
316
317/// Scaling decision made by the mapper.
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub enum ScalingDecision {
320    /// No scaling needed.
321    None,
322    /// Scale up by adding N shards.
323    ScaleUp(u16),
324    /// Scale down by removing N shards (marks them for draining).
325    ScaleDown(u16),
326}
327
328/// Errors that can occur during scaling operations.
329#[derive(Debug, Clone)]
330pub enum ScalingError {
331    /// Invalid scaling policy.
332    InvalidPolicy(String),
333    /// Already at maximum shards.
334    AtMaxShards,
335    /// Already at minimum shards.
336    AtMinShards,
337    /// Scaling operation in cooldown.
338    InCooldown,
339    /// Shard creation failed.
340    ShardCreationFailed(String),
341}
342
343impl std::fmt::Display for ScalingError {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        match self {
346            Self::InvalidPolicy(msg) => write!(f, "invalid scaling policy: {}", msg),
347            Self::AtMaxShards => write!(f, "already at maximum shard count"),
348            Self::AtMinShards => write!(f, "already at minimum shard count"),
349            Self::InCooldown => write!(f, "scaling operation in cooldown"),
350            Self::ShardCreationFailed(msg) => write!(f, "shard creation failed: {}", msg),
351        }
352    }
353}
354
355impl std::error::Error for ScalingError {}
356
357/// State of a shard in the mapper.
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub enum ShardState {
360    /// Shard is being provisioned: id allocated and metrics collector
361    /// in place, but `select_shard` must not route to it yet because
362    /// upstream workers (drain / batch) have not been spawned. Caller
363    /// transitions to `Active` via `activate` once the workers are
364    /// ready. Closes the race where a freshly-added shard accepted
365    /// producer pushes before any consumer existed.
366    Provisioning,
367    /// Shard is active and accepting producers.
368    Active,
369    /// Shard is draining (no new producers, waiting for empty).
370    Draining,
371    /// Shard is stopped and can be removed.
372    Stopped,
373}
374
375/// Information about a shard managed by the mapper.
376#[derive(Debug)]
377struct MappedShard {
378    /// Shard ID.
379    id: u16,
380    /// Current state.
381    state: ShardState,
382    /// Metrics collector.
383    metrics: Arc<ShardMetricsCollector>,
384    /// When this shard entered drain mode (if draining).
385    drain_started: Option<Instant>,
386    /// Last collected metrics.
387    last_metrics: ShardMetrics,
388    /// When this shard last transitioned to `Active`. Used by
389    /// `evaluate_scaling` to skip recently-activated shards from
390    /// scale decisions: their `last_metrics` is the
391    /// `ShardMetrics::new(id)` placeholder until at least one
392    /// `collect_metrics` cycle has run, and the placeholder
393    /// (`fill_ratio = 0.0, event_rate = 0`) trips the
394    /// underutilized trigger immediately — oscillating the system
395    /// (scale-up → next tick scale-down → next tick scale-up …)
396    /// when a fresh shard is added but hasn't yet absorbed any
397    /// traffic.
398    activated_at: Instant,
399}
400
401/// Callback type for shard lifecycle events.
402type ShardCallback = Box<dyn Fn(u16) + Send + Sync>;
403
404/// Dynamic shard mapper that manages shard allocation and producer routing.
405///
406/// This is the core component for dynamic scaling. It:
407/// - Tracks metrics for all shards
408/// - Makes scaling decisions based on policy
409/// - Routes producers to the least-loaded shards
410/// - Manages shard lifecycle (active → draining → stopped)
411pub struct ShardMapper {
412    /// Mapped shards (RwLock for concurrent reads, rare writes).
413    shards: RwLock<Vec<MappedShard>>,
414    /// Current active shard count.
415    active_count: AtomicU16,
416    /// Scaling policy.
417    ///
418    /// This field is immutable for the lifetime of the mapper. The
419    /// previous `set_policy(&mut self, …)` API was unreachable in
420    /// practice — every production callsite holds the mapper behind
421    /// an `Arc`, and `Arc::get_mut` requires a strong count of 1,
422    /// which never holds once the worker pool clones the `Arc`. The
423    /// method has been removed; recreate the mapper (and
424    /// the bus that owns it) to change the policy.
425    policy: ScalingPolicy,
426    /// Ring buffer capacity for new shards.
427    ring_buffer_capacity: usize,
428    /// Last scaling operation timestamp.
429    ///
430    /// This RwLock is **logically** scoped to the
431    /// outer `shards.write()` lock — `scale_up`, `scale_down`,
432    /// and `scale_up_provisioning` all read this field and write
433    /// to it while holding `shards.write()`. The cooldown gate
434    /// is therefore atomic with the scale mutation: no caller
435    /// can pass the cooldown check, observe stale `last_scaling`,
436    /// and have its mutation interleave with another caller.
437    ///
438    /// **If you narrow `shards.write()`'s scope in any of those
439    /// callers, this implicit serialization breaks.** Either:
440    ///   - Keep the cooldown read+write inside the outer lock, OR
441    ///   - Use a `compare_exchange`-style update on a single
442    ///     `AtomicI64` of nanos so the gate is atomic on its own.
443    ///
444    /// The doc-comment is here so a future refactorer doesn't
445    /// silently break the contract.
446    last_scaling: RwLock<Option<Instant>>,
447    /// Callback for shard creation (provided by ShardManager).
448    on_shard_created: RwLock<Option<ShardCallback>>,
449    /// Callback for shard removal (provided by ShardManager).
450    on_shard_removed: RwLock<Option<ShardCallback>>,
451    /// Monotonic shard-id allocator. The next `scale_up` call gets
452    /// `fetch_add(1)` from here. Distinct from `shards.iter().max() +
453    /// 1`: that approach reused ids whenever the highest-numbered
454    /// shard had been drained-and-removed, silently merging two
455    /// unrelated shard lifetimes in any external system that keys
456    /// metrics or checkpoints on shard id. Monotonic allocation
457    /// ensures every shard ever allocated has a globally unique id
458    /// for the lifetime of this mapper.
459    next_shard_id: AtomicU16,
460}
461
462impl ShardMapper {
463    /// Create a new shard mapper with the given initial shard count and policy.
464    pub fn new(
465        initial_shards: u16,
466        ring_buffer_capacity: usize,
467        policy: ScalingPolicy,
468    ) -> Result<Self, ScalingError> {
469        let mut policy = policy.normalize();
470        // Ensure max_shards can accommodate the initial shard count
471        if policy.max_shards < initial_shards {
472            policy.max_shards = initial_shards;
473        }
474        policy
475            .validate()
476            .map_err(|e| ScalingError::InvalidPolicy(e.to_string()))?;
477
478        // Initial shards: stamp `activated_at` far enough in the
479        // past that they're not subject to the warmup skip in
480        // `evaluate_scaling`. The boot-time shards have whatever
481        // baseline traffic the system serves; they shouldn't be
482        // exempted from scale decisions just because the mapper
483        // was just constructed.
484        let boot = Instant::now()
485            .checked_sub(std::time::Duration::from_secs(3600))
486            .unwrap_or_else(Instant::now);
487        let shards: Vec<MappedShard> = (0..initial_shards)
488            .map(|id| MappedShard {
489                id,
490                state: ShardState::Active,
491                metrics: Arc::new(ShardMetricsCollector::new(id, ring_buffer_capacity)),
492                drain_started: None,
493                last_metrics: ShardMetrics::new(id),
494                activated_at: boot,
495            })
496            .collect();
497
498        Ok(Self {
499            shards: RwLock::new(shards),
500            active_count: AtomicU16::new(initial_shards),
501            policy,
502            ring_buffer_capacity,
503            last_scaling: RwLock::new(None),
504            on_shard_created: RwLock::new(None),
505            on_shard_removed: RwLock::new(None),
506            // Initial shards occupy ids `[0, initial_shards)`, so the
507            // first scale-up takes `initial_shards`.
508            next_shard_id: AtomicU16::new(initial_shards),
509        })
510    }
511
512    /// Set callback for shard creation.
513    pub fn set_on_shard_created<F>(&self, callback: F)
514    where
515        F: Fn(u16) + Send + Sync + 'static,
516    {
517        *self.on_shard_created.write() = Some(Box::new(callback));
518    }
519
520    /// Set callback for shard removal.
521    pub fn set_on_shard_removed<F>(&self, callback: F)
522    where
523        F: Fn(u16) + Send + Sync + 'static,
524    {
525        *self.on_shard_removed.write() = Some(Box::new(callback));
526    }
527
528    /// Get the metrics collector for a shard.
529    pub fn metrics_collector(&self, shard_id: u16) -> Option<Arc<ShardMetricsCollector>> {
530        let shards = self.shards.read();
531        shards
532            .iter()
533            .find(|s| s.id == shard_id)
534            .map(|s| s.metrics.clone())
535    }
536
537    /// Get current active shard count.
538    pub fn active_shard_count(&self) -> u16 {
539        self.active_count.load(AtomicOrdering::Acquire)
540    }
541
542    /// Get total shard count (including draining).
543    pub fn total_shard_count(&self) -> u16 {
544        self.shards.read().len() as u16
545    }
546
547    /// Select the best shard for a new event/producer.
548    ///
549    /// This implements weighted shard selection:
550    /// - Only considers active (non-draining) shards
551    /// - Prefers shards with lower weight (less loaded)
552    /// - Falls back to round-robin if weights are equal
553    #[inline]
554    pub fn select_shard(&self, event_hash: u64) -> u16 {
555        let shards = self.shards.read();
556
557        // Filter to active shards only
558        let active: Vec<_> = shards
559            .iter()
560            .filter(|s| s.state == ShardState::Active)
561            .collect();
562
563        if active.is_empty() {
564            // Previously fell back to `find(|s| s.state !=
565            // ShardState::Stopped)`, which silently routed to a
566            // `Draining` shard. Pushes to a draining shard increment
567            // `pushes_since_drain_start`, blocking finalization
568            // indefinitely.
569            //
570            // `Provisioning` shards aren't routable either — they
571            // exist in the mapper but the routing table doesn't have
572            // them yet, so any caller that tries to push lands
573            // in `resolve_idx → None` and surfaces as "unrouted" via
574            // the manager-level counter. That's the correct signal:
575            // the system has no destination for this event, the
576            // caller should back off and retry.
577            //
578            // Return `u16::MAX` (out-of-band sentinel) so callers
579            // that look up the id in the routing table get a
580            // definite miss, which the manager already accounts for.
581            return u16::MAX;
582        }
583
584        // Find the shard with lowest weight
585        let min_weight = active
586            .iter()
587            .map(|s| s.last_metrics.weight)
588            .fold(f64::MAX, f64::min);
589
590        // Get all shards with the minimum weight (within tolerance)
591        let candidates: Vec<_> = active
592            .iter()
593            .filter(|s| (s.last_metrics.weight - min_weight).abs() < 0.1)
594            .collect();
595
596        // Use hash to pick among candidates for determinism
597        // Fallback to first active shard if tolerance filter excludes all (e.g. NaN weights)
598        if candidates.is_empty() {
599            return active[0].id;
600        }
601        // Pre-fix used `(event_hash as usize) % candidates.len()`,
602        // which biases low-bucket indices when `candidates.len()`
603        // is not a power of two. With u64 hashes the bias is small
604        // but non-zero and accumulates over time as a sustained
605        // skew toward shards at the low end of the candidate
606        // vector. Lemire's technique
607        // (https://lemire.me/blog/2016/06/30/fast-random-shuffling/)
608        // computes the index as `(hash * len) >> 64` — an unbiased
609        // integer mapping for any `len` that fits in u64.
610        let idx = ((event_hash as u128 * candidates.len() as u128) >> 64) as usize;
611        candidates[idx].id
612    }
613
614    /// Collect metrics from all shards and update weights.
615    pub fn collect_metrics(&self) -> Vec<ShardMetrics> {
616        let mut shards = self.shards.write();
617        shards
618            .iter_mut()
619            .map(|s| {
620                s.last_metrics = s.metrics.collect_and_reset();
621                s.last_metrics.clone()
622            })
623            .collect()
624    }
625
626    /// Evaluate scaling based on current metrics.
627    ///
628    /// Returns a scaling decision without executing it.
629    pub fn evaluate_scaling(&self) -> ScalingDecision {
630        if !self.policy.auto_scale {
631            return ScalingDecision::None;
632        }
633
634        // Check cooldown
635        if let Some(last) = *self.last_scaling.read() {
636            if last.elapsed() < self.policy.cooldown {
637                return ScalingDecision::None;
638            }
639        }
640
641        let shards = self.shards.read();
642        let active_count = self.active_count.load(AtomicOrdering::Acquire);
643
644        // Check for scale-up triggers
645        let mut overloaded_count = 0;
646        let mut underutilized_count = 0;
647
648        // Warmup window for freshly-activated shards. A just-
649        // activated shard's `last_metrics` is the
650        // `ShardMetrics::new(id)` placeholder
651        // (`fill_ratio = 0.0, event_rate = 0`) until at least one
652        // `collect_metrics` cycle has run. The placeholder
653        // immediately matches the underutilized trigger
654        // (`fill_ratio < underutilized_threshold && event_rate
655        // == 0`), so a fresh shard added by scale-up would
656        // immediately count as underutilized on the next
657        // `evaluate_scaling` and trigger scale-down — oscillating
658        // the system. Reuse `policy.cooldown` as the warmup
659        // window: it's already the minimum gap between scaling
660        // actions, and a shard collected at least once within
661        // that window has accumulated real metrics.
662        let now = Instant::now();
663        let warmup = self.policy.cooldown;
664
665        for shard in shards.iter() {
666            if shard.state != ShardState::Active {
667                continue;
668            }
669
670            // Skip the placeholder-metrics window for freshly-
671            // activated shards. They count toward `active_count`
672            // (so the budget math stays consistent) but don't
673            // tip the overload/underutilized tallies.
674            if now.duration_since(shard.activated_at) < warmup {
675                continue;
676            }
677
678            let m = &shard.last_metrics;
679
680            // Scale-up triggers
681            if m.fill_ratio > self.policy.fill_ratio_threshold
682                || m.avg_push_latency_ns > self.policy.push_latency_threshold_ns
683                || m.avg_flush_latency_us > self.policy.flush_latency_threshold_us
684            {
685                overloaded_count += 1;
686            }
687
688            // Scale-down triggers
689            if m.fill_ratio < self.policy.underutilized_threshold && m.event_rate == 0 {
690                underutilized_count += 1;
691            }
692        }
693
694        // Scale up if more than half of shards are overloaded
695        if overloaded_count > active_count / 2 && active_count < self.policy.max_shards {
696            // Add shards proportional to overload
697            let shards_to_add = (overloaded_count / 2)
698                .max(1)
699                .min(self.policy.max_shards - active_count);
700            return ScalingDecision::ScaleUp(shards_to_add);
701        }
702
703        // Scale down if more than half of shards are underutilized
704        // and we're above minimum
705        if underutilized_count > active_count / 2 && active_count > self.policy.min_shards {
706            let shards_to_remove = (underutilized_count / 2)
707                .max(1)
708                .min(active_count - self.policy.min_shards);
709            return ScalingDecision::ScaleDown(shards_to_remove);
710        }
711
712        ScalingDecision::None
713    }
714
715    /// Validate cooldown + max-shards budget for an `add count`
716    /// scale-up request. Cheap pre-check that doesn't take the
717    /// write lock — callers re-check under the lock.
718    fn check_scale_up_budget(&self, count: u16) -> Result<(), ScalingError> {
719        let current = self.active_count.load(AtomicOrdering::Acquire);
720        let would_be = current
721            .checked_add(count)
722            .ok_or(ScalingError::AtMaxShards)?;
723        if would_be > self.policy.max_shards {
724            return Err(ScalingError::AtMaxShards);
725        }
726        let last = self.last_scaling.read();
727        if let Some(ts) = *last {
728            if ts.elapsed() < self.policy.cooldown {
729                return Err(ScalingError::InCooldown);
730            }
731        }
732        Ok(())
733    }
734
735    /// Allocate `count` shard ids and push their `MappedShard`
736    /// records into `shards` with the supplied `state`. The caller
737    /// already holds `self.shards.write()` and is responsible for
738    /// dropping it before notifying callbacks. Returns the allocated
739    /// ids in order.
740    ///
741    /// Performs the budget + cooldown re-check under the write lock,
742    /// the next_shard_id allocation, and the per-shard push. Does NOT
743    /// touch `active_count` — `scale_up` bumps it for `Active` shards;
744    /// `Provisioning` shards bump it later when `activate` fires.
745    fn allocate_shards_inner(
746        &self,
747        count: u16,
748        state: ShardState,
749        shards: &mut Vec<MappedShard>,
750    ) -> Result<Vec<u16>, ScalingError> {
751        self.allocate_shards_inner_with_policy(count, state, shards, false)
752    }
753
754    fn allocate_shards_inner_with_policy(
755        &self,
756        count: u16,
757        state: ShardState,
758        shards: &mut Vec<MappedShard>,
759        force: bool,
760    ) -> Result<Vec<u16>, ScalingError> {
761        // Re-check budget under the write lock — two concurrent
762        // scale-up callers could both pass the read-locked early
763        // check, both serialize through `shards.write()`, and both
764        // succeed without this re-check.
765        if force {
766            // Budget only — skip cooldown for operator-initiated paths.
767            let current = self.active_count.load(AtomicOrdering::Acquire);
768            let would_be = current
769                .checked_add(count)
770                .ok_or(ScalingError::AtMaxShards)?;
771            if would_be > self.policy.max_shards {
772                return Err(ScalingError::AtMaxShards);
773            }
774        } else {
775            self.check_scale_up_budget(count)?;
776        }
777
778        let first_id = self.next_shard_id.load(AtomicOrdering::Relaxed);
779        let last_needed = first_id
780            .checked_add(count.saturating_sub(1))
781            .ok_or(ScalingError::AtMaxShards)?;
782        // Reserve `u16::MAX` as a sentinel so the post-allocation
783        // store cannot wrap.
784        if last_needed == u16::MAX {
785            return Err(ScalingError::AtMaxShards);
786        }
787        // `first_id + count == last_needed + 1`. We already
788        // refused `last_needed == u16::MAX` above, so the sum is
789        // provably <= u16::MAX. Use `checked_add` anyway as a
790        // belt-and-suspenders guard: a future change that
791        // weakens the sentinel check would otherwise reach an
792        // unchecked u16 wrap here, silently rolling
793        // `next_shard_id` back to 0 and then re-issuing already-
794        // allocated ids.
795        let next_id_after = first_id
796            .checked_add(count)
797            .ok_or(ScalingError::AtMaxShards)?;
798        self.next_shard_id
799            .store(next_id_after, AtomicOrdering::Relaxed);
800
801        let mut new_ids = Vec::with_capacity(count as usize);
802        let now = Instant::now();
803        for i in 0..count {
804            let new_id = first_id + i;
805            shards.push(MappedShard {
806                id: new_id,
807                state,
808                metrics: Arc::new(ShardMetricsCollector::new(
809                    new_id,
810                    self.ring_buffer_capacity,
811                )),
812                drain_started: None,
813                last_metrics: ShardMetrics::new(new_id),
814                // Stamp the activation moment so `evaluate_scaling`
815                // can skip this shard until at least one collect
816                // cycle has run. Prevents the placeholder
817                // (`fill_ratio = 0, event_rate = 0`) from
818                // immediately tripping the underutilized trigger
819                // and oscillating the system.
820                activated_at: now,
821            });
822            new_ids.push(new_id);
823        }
824        Ok(new_ids)
825    }
826
827    /// Execute a scale-up operation.
828    ///
829    /// Creates new shards in the `Active` state and makes them
830    /// immediately available for routing. Use [`scale_up_provisioning`]
831    /// if upstream workers (drain / batch) need to be wired up before
832    /// the shard becomes selectable — otherwise producer pushes can
833    /// race ahead of consumer creation.
834    ///
835    /// [`scale_up_provisioning`]: Self::scale_up_provisioning
836    pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
837        // Short-circuit `count == 0` so a no-op call doesn't bump the
838        // cooldown timestamp or trip the `u16::MAX` sentinel check
839        // (which previously fired spuriously when
840        // `first_id == u16::MAX` even though zero ids were being
841        // allocated).
842        if count == 0 {
843            return Ok(Vec::new());
844        }
845
846        self.check_scale_up_budget(count)?;
847
848        let mut shards = self.shards.write();
849        let new_ids = self.allocate_shards_inner(count, ShardState::Active, &mut shards)?;
850
851        // Update counts. `fetch_add` cannot wrap here — the
852        // `check_scale_up_budget` gate above ensures `current + count <=
853        // max_shards <= u16::MAX` — but keep the ordering explicit
854        // so the next contender's cooldown re-check sees the fresh
855        // timestamp.
856        self.active_count.fetch_add(count, AtomicOrdering::Release);
857        *self.last_scaling.write() = Some(Instant::now());
858
859        // Drop the write lock before notifying callbacks — they
860        // are user-supplied and may take arbitrary time.
861        drop(shards);
862
863        if let Some(callback) = self.on_shard_created.read().as_ref() {
864            for &id in &new_ids {
865                callback(id);
866            }
867        }
868
869        Ok(new_ids)
870    }
871
872    /// Like [`scale_up`], but the new shards are created in the
873    /// `Provisioning` state. They receive an id and a metrics
874    /// collector, but `select_shard` will not route to them and they
875    /// are excluded from `active_shard_count` / `evaluate_scaling`
876    /// until the caller transitions each shard with [`activate`].
877    ///
878    /// Use this when upstream consumer infrastructure (drain/batch
879    /// workers, mpsc channels, etc.) must be wired up *before* the
880    /// shard becomes selectable. Without this gating, producers can
881    /// observe the shard via `select_shard`, push into its ring
882    /// buffer, and never have those events drained.
883    ///
884    /// Returns the allocated ids in order. Cooldown / `max_shards`
885    /// gating matches `scale_up` so that staged allocation cannot be
886    /// used to bypass the policy.
887    ///
888    /// [`scale_up`]: Self::scale_up
889    /// [`activate`]: Self::activate
890    pub fn scale_up_provisioning(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
891        if count == 0 {
892            return Ok(Vec::new());
893        }
894
895        self.check_scale_up_budget(count)?;
896
897        let mut shards = self.shards.write();
898        let new_ids = self.allocate_shards_inner(count, ShardState::Provisioning, &mut shards)?;
899
900        // Bump cooldown but NOT active_count — these shards are
901        // not active yet. `activate` bumps active_count when each
902        // becomes selectable.
903        *self.last_scaling.write() = Some(Instant::now());
904        drop(shards);
905        // Intentionally do NOT fire `on_shard_created` here — the
906        // callback signals "shard is live"; provisioning shards are
907        // not. `activate` fires it instead.
908        Ok(new_ids)
909    }
910
911    /// Allocate `count` Provisioning shards, bypassing the cooldown gate.
912    ///
913    /// Used by operator-initiated `manual_scale_up` paths. The
914    /// cooldown exists to prevent the auto-scaling monitor from
915    /// scaling-up too aggressively in response to transient
916    /// load spikes; a manual call from an operator is a
917    /// deliberate request that should not be rate-limited by
918    /// the auto-scaling cadence. The budget check (against
919    /// `max_shards`) still applies.
920    ///
921    /// Pre-fix `manual_scale_up(N)` looped `add_shard()` N
922    /// times, each call invoking `scale_up_provisioning(1)`
923    /// which bumped `last_scaling`. The second call then
924    /// immediately failed with `InCooldown` (default 30s
925    /// cooldown), leaving the first shard half-added and
926    /// returning an error to the operator with no rollback.
927    pub fn scale_up_provisioning_force(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
928        if count == 0 {
929            return Ok(Vec::new());
930        }
931
932        let mut shards = self.shards.write();
933        let new_ids = self.allocate_shards_inner_with_policy(
934            count,
935            ShardState::Provisioning,
936            &mut shards,
937            true, // skip cooldown
938        )?;
939
940        // Still bump the cooldown timestamp so the next
941        // *auto-scaling* tick respects the cooldown floor.
942        *self.last_scaling.write() = Some(Instant::now());
943        drop(shards);
944        Ok(new_ids)
945    }
946
947    /// Transition a `Provisioning` shard to `Active`.
948    ///
949    /// Returns `Ok(true)` if a state transition actually occurred
950    /// (Provisioning → Active) and `Ok(false)` if the shard was
951    /// already `Active` — the latter is the idempotent path.
952    /// Returns `InvalidPolicy` for unknown or `Draining`/`Stopped`
953    /// shards — those states require a different lifecycle path.
954    /// Bumps `active_count` and notifies the `on_shard_created`
955    /// callback exactly once per real transition.
956    ///
957    /// Pre-fix this returned `Result<(), ScalingError>`,
958    /// so callers (notably `ShardManager::activate_shard`) could
959    /// not tell whether they had bumped the live count or not and
960    /// double-incremented their own `num_shards` on every
961    /// idempotent call.
962    pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError> {
963        let mut shards = self.shards.write();
964        let shard = shards
965            .iter_mut()
966            .find(|s| s.id == shard_id)
967            .ok_or_else(|| {
968                ScalingError::InvalidPolicy(format!("activate: shard {} not found", shard_id))
969            })?;
970        match shard.state {
971            ShardState::Active => return Ok(false),
972            ShardState::Provisioning => {
973                // Gate activation on
974                // `active_count < max_shards`. The budget gate at
975                // `check_scale_up_budget` only counts ALREADY-active
976                // shards — multiple `scale_up_provisioning(1)` calls
977                // can each pass (they don't bump `active_count`),
978                // and an unconditional `fetch_add(1)` here would
979                // push past `max_shards`. Subsequent
980                // `evaluate_scaling`'s `max_shards - active_count`
981                // arithmetic would then underflow u16 (debug-build
982                // panic; release wraps to ~65530). Activate-time
983                // re-checks the budget with the lock held; the
984                // Provisioning shard stays in Provisioning state and
985                // the caller (e.g. `add_shard_internal`'s rollback)
986                // is responsible for tearing it down.
987                //
988                // The load + state mutation + `fetch_add` all happen
989                // while we hold the `shards.write()` guard so a
990                // concurrent `activate(distinct_id)` reads the
991                // already-bumped `active_count` and hits
992                // `AtMaxShards` instead of squeezing through the
993                // window between our state update and our
994                // `fetch_add`. Pre-fix the `fetch_add` ran after
995                // `drop(shards)` — two activates could each see a
996                // stale count below `max_shards` and both bump,
997                // transiently overshooting the budget.
998                let current = self.active_count.load(AtomicOrdering::Acquire);
999                if current >= self.policy.max_shards {
1000                    return Err(ScalingError::AtMaxShards);
1001                }
1002                shard.state = ShardState::Active;
1003                // Re-stamp `activated_at` so `evaluate_scaling`
1004                // gives this freshly-activated shard a warmup
1005                // window before counting it in the
1006                // overloaded/underutilized tallies. The
1007                // Provisioning → Active transition is the moment
1008                // traffic starts flowing; the shard's
1009                // `last_metrics` is still the
1010                // `ShardMetrics::new(id)` placeholder until the
1011                // next `collect_metrics` cycle.
1012                shard.activated_at = Instant::now();
1013                // Publish the increment while still holding the
1014                // write lock. `Release` here pairs with the
1015                // `Acquire` load above so any later activator that
1016                // takes the same lock observes our bump.
1017                self.active_count.fetch_add(1, AtomicOrdering::Release);
1018            }
1019            ShardState::Draining | ShardState::Stopped => {
1020                return Err(ScalingError::InvalidPolicy(format!(
1021                    "activate: shard {} is in state {:?}, cannot activate",
1022                    shard_id, shard.state
1023                )));
1024            }
1025        }
1026        drop(shards);
1027
1028        if let Some(callback) = self.on_shard_created.read().as_ref() {
1029            callback(shard_id);
1030        }
1031        Ok(true)
1032    }
1033
1034    /// Drain a specific shard by id, transitioning it from `Active`
1035    /// to `Draining`.
1036    ///
1037    /// Companion to `ShardManager::drain_shard`. The previous
1038    /// implementation only flipped the metrics collector's `draining`
1039    /// atomic; this version atomically updates `MappedShard.state`
1040    /// (so `select_shard` stops routing to the shard) and decrements
1041    /// `active_count` (so `evaluate_scaling`'s budget math stays
1042    /// consistent with `scale_down`). Returns an error if the shard
1043    /// is not in `Active` state, or if doing so would push the active
1044    /// count below `min_shards`.
1045    pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError> {
1046        let mut shards = self.shards.write();
1047        let current_active = self.active_count.load(AtomicOrdering::Acquire);
1048        if current_active <= self.policy.min_shards {
1049            return Err(ScalingError::AtMinShards);
1050        }
1051        let shard = shards
1052            .iter_mut()
1053            .find(|s| s.id == shard_id)
1054            .ok_or_else(|| {
1055                ScalingError::InvalidPolicy(format!("drain_specific: shard {} not found", shard_id))
1056            })?;
1057        match shard.state {
1058            ShardState::Active => {
1059                shard.state = ShardState::Draining;
1060                shard.drain_started = Some(Instant::now());
1061                shard.metrics.set_draining(true);
1062            }
1063            ShardState::Draining => return Ok(()),
1064            ShardState::Provisioning | ShardState::Stopped => {
1065                return Err(ScalingError::InvalidPolicy(format!(
1066                    "drain_specific: shard {} is in state {:?}, cannot drain",
1067                    shard_id, shard.state
1068                )));
1069            }
1070        }
1071        drop(shards);
1072        self.active_count.fetch_sub(1, AtomicOrdering::Release);
1073        // Bump `last_scaling` so a subsequent `scale_up` is gated
1074        // by the cooldown floor. Pre-fix `drain_specific` removed
1075        // a shard from Active without touching `last_scaling`, so
1076        // the sequence `drain_specific(id) → scale_up(N)`
1077        // bypassed the cooldown — `scale_down` writes
1078        // `last_scaling` precisely for this reason. From the
1079        // budget-math perspective `drain_specific` IS a scale-
1080        // down (it decrements `active_count` and trips the
1081        // `min_shards` floor), so it should also gate
1082        // re-expansion the same way.
1083        *self.last_scaling.write() = Some(Instant::now());
1084        Ok(())
1085    }
1086
1087    /// Start draining shards for scale-down.
1088    ///
1089    /// Marks shards as draining so they stop receiving new events.
1090    /// Shards will be removed once they're empty.
1091    pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1092        // Early checks (may race, but avoid acquiring the write lock)
1093        let current = self.active_count.load(AtomicOrdering::Acquire);
1094        if current <= self.policy.min_shards {
1095            return Err(ScalingError::AtMinShards);
1096        }
1097
1098        let to_drain = count.min(current - self.policy.min_shards);
1099        if to_drain == 0 {
1100            return Err(ScalingError::AtMinShards);
1101        }
1102        {
1103            let last = self.last_scaling.read();
1104            if let Some(ts) = *last {
1105                if ts.elapsed() < self.policy.cooldown {
1106                    return Err(ScalingError::InCooldown);
1107                }
1108            }
1109        }
1110
1111        let mut shards = self.shards.write();
1112
1113        // Re-check under the lock to prevent race conditions (double-check pattern)
1114        let current = self.active_count.load(AtomicOrdering::Acquire);
1115        if current <= self.policy.min_shards {
1116            return Err(ScalingError::AtMinShards);
1117        }
1118        let to_drain = count.min(current - self.policy.min_shards);
1119        if to_drain == 0 {
1120            return Err(ScalingError::AtMinShards);
1121        }
1122        // Re-check cooldown under the same write lock that gates
1123        // mutation — see the matching note in `scale_up`.
1124        {
1125            let last = self.last_scaling.read();
1126            if let Some(ts) = *last {
1127                if ts.elapsed() < self.policy.cooldown {
1128                    return Err(ScalingError::InCooldown);
1129                }
1130            }
1131        }
1132
1133        let mut drained_ids = Vec::with_capacity(to_drain as usize);
1134
1135        // Find shards with lowest weight (least utilized) to drain
1136        let mut active_indices: Vec<_> = shards
1137            .iter()
1138            .enumerate()
1139            .filter(|(_, s)| s.state == ShardState::Active)
1140            .map(|(i, s)| (i, s.last_metrics.weight))
1141            .collect();
1142
1143        // Sort by weight (ascending - drain least utilized first)
1144        active_indices.sort_by(|a, b| a.1.total_cmp(&b.1));
1145
1146        // Mark shards for draining
1147        for (idx, _) in active_indices.into_iter().take(to_drain as usize) {
1148            shards[idx].state = ShardState::Draining;
1149            shards[idx].drain_started = Some(Instant::now());
1150            shards[idx].metrics.set_draining(true);
1151            drained_ids.push(shards[idx].id);
1152        }
1153
1154        // Update count
1155        self.active_count
1156            .fetch_sub(to_drain, AtomicOrdering::Release);
1157        *self.last_scaling.write() = Some(Instant::now());
1158
1159        Ok(drained_ids)
1160    }
1161
1162    /// Check draining shards and finalize those that are empty.
1163    ///
1164    /// Returns IDs of shards that were stopped.
1165    ///
1166    /// This predicate looks ONLY at the ring buffer
1167    /// (`current_len` + `pushes_since_drain_start`); it does NOT
1168    /// probe the per-shard mpsc channel or the BatchWorker's
1169    /// `current_batch`. A shard that the predicate flags as empty
1170    /// can still have events queued in those two places. The
1171    /// correctness gate is therefore `bus::remove_shard_internal`,
1172    /// which awaits the BatchWorker's `JoinHandle` before
1173    /// constructing the stranded-flush batch — see that function's
1174    /// step 3 for the rationale. Tightening this predicate is a
1175    /// defense-in-depth follow-up; a stricter ring-buffer-empty
1176    /// signal here would only narrow an already-closed window.
1177    pub fn finalize_draining(&self) -> Vec<u16> {
1178        let mut shards = self.shards.write();
1179        let mut stopped = Vec::new();
1180
1181        for shard in shards.iter_mut() {
1182            if shard.state == ShardState::Draining {
1183                // Check if shard is empty by reading current_len directly,
1184                // avoiding collect_and_reset() which destructively zeros all counters.
1185                let current_len = shard.metrics.current_len.load(AtomicOrdering::Relaxed);
1186                let fill_ratio = if shard.metrics.capacity > 0 {
1187                    current_len as f64 / shard.metrics.capacity as f64
1188                } else {
1189                    0.0
1190                };
1191                // Previously read `events_in_window` here, which
1192                // `collect_and_reset` zeros every metrics tick. A
1193                // producer push that landed in the window between two
1194                // ticks could be silently zeroed out, so a draining
1195                // shard whose buffer transiently emptied was finalized
1196                // with a producer still attached.
1197                // `pushes_since_drain_start` is a separate counter
1198                // that is only reset by `set_draining(true)`, so any
1199                // push observed since the drain began is sticky —
1200                // exactly the signal we want.
1201                // Acquire pairs with `set_draining`'s SeqCst reset so
1202                // the load can't observe a stale value from before the
1203                // drain began. A Relaxed load here let weakly-ordered
1204                // hardware see the pre-reset count and finalize while
1205                // a producer was still pushing.
1206                let pushes_after_drain = shard.metrics.pushes_since_drain_start();
1207                if fill_ratio == 0.0 && pushes_after_drain == 0 {
1208                    // Check if we've waited long enough
1209                    if let Some(drain_start) = shard.drain_started {
1210                        if drain_start.elapsed() > Duration::from_millis(100) {
1211                            shard.state = ShardState::Stopped;
1212                            stopped.push(shard.id);
1213                        }
1214                    }
1215                }
1216            }
1217        }
1218
1219        // Drop the write lock BEFORE notifying. The callback is
1220        // user-supplied and may re-enter the mapper (`shard_state`,
1221        // `select_shard`, `metrics_collector`, …), each of which
1222        // acquires `shards.read()`. `parking_lot::RwLock` is not
1223        // recursive, so a re-entrant read attempt while we hold a
1224        // write would deadlock. `scale_up`'s callback path already
1225        // releases its lock before calling out — mirror that
1226        // here.
1227        drop(shards);
1228
1229        if !stopped.is_empty() {
1230            if let Some(callback) = self.on_shard_removed.read().as_ref() {
1231                for &id in &stopped {
1232                    callback(id);
1233                }
1234            }
1235        }
1236
1237        stopped
1238    }
1239
1240    /// Remove a specific shard from the mapper if it is in the
1241    /// `Stopped` state. Used by `ShardManager::remove_shard` so a
1242    /// per-shard cleanup doesn't disturb sibling `Stopped`
1243    /// entries — which a sequential `manual_scale_down` loop
1244    /// still needs to look up state for. Returns `true` if the
1245    /// shard existed and was Stopped (and was removed).
1246    pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool {
1247        let mut shards = self.shards.write();
1248        let before = shards.len();
1249        shards.retain(|s| !(s.id == shard_id && s.state == ShardState::Stopped));
1250        shards.len() < before
1251    }
1252
1253    /// Remove stopped shards from the mapper.
1254    pub fn remove_stopped_shards(&self) -> Vec<u16> {
1255        let mut shards = self.shards.write();
1256        let before = shards.len();
1257        let removed: Vec<u16> = shards
1258            .iter()
1259            .filter(|s| s.state == ShardState::Stopped)
1260            .map(|s| s.id)
1261            .collect();
1262
1263        shards.retain(|s| s.state != ShardState::Stopped);
1264
1265        if shards.len() < before {
1266            tracing::info!(
1267                removed = removed.len(),
1268                remaining = shards.len(),
1269                "Removed stopped shards"
1270            );
1271        }
1272
1273        removed
1274    }
1275
1276    /// Get the state of a specific shard.
1277    pub fn shard_state(&self, shard_id: u16) -> Option<ShardState> {
1278        self.shards
1279            .read()
1280            .iter()
1281            .find(|s| s.id == shard_id)
1282            .map(|s| s.state)
1283    }
1284
1285    /// Get all active shard IDs.
1286    pub fn active_shard_ids(&self) -> Vec<u16> {
1287        self.shards
1288            .read()
1289            .iter()
1290            .filter(|s| s.state == ShardState::Active)
1291            .map(|s| s.id)
1292            .collect()
1293    }
1294
1295    /// Get all shard IDs (including draining).
1296    pub fn all_shard_ids(&self) -> Vec<u16> {
1297        self.shards
1298            .read()
1299            .iter()
1300            .filter(|s| s.state != ShardState::Stopped)
1301            .map(|s| s.id)
1302            .collect()
1303    }
1304
1305    /// Get the scaling policy.
1306    pub fn policy(&self) -> &ScalingPolicy {
1307        &self.policy
1308    }
1309    // `set_policy` previously took `&mut self` and was unreachable
1310    // through the `Arc<ShardMapper>` that the production code holds
1311    // (`Arc::get_mut` fails once the worker pool has cloned the Arc).
1312    // The method has been removed — recreate the mapper / bus to
1313    // change the policy.
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318    use super::*;
1319
1320    #[test]
1321    fn test_shard_mapper_creation() {
1322        let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1323        assert_eq!(mapper.active_shard_count(), 4);
1324        assert_eq!(mapper.total_shard_count(), 4);
1325    }
1326
1327    #[test]
1328    fn test_select_shard_distributes() {
1329        let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1330
1331        // Different hashes should potentially select different shards
1332        let mut selected = std::collections::HashSet::new();
1333        for i in 0..100u64 {
1334            let shard = mapper.select_shard(i * 12345);
1335            selected.insert(shard);
1336        }
1337
1338        // With 4 shards, we should hit multiple
1339        assert!(!selected.is_empty());
1340    }
1341
1342    /// `select_shard`'s candidate-index computation must
1343    /// be unbiased across the u64 hash space. Pre-fix used
1344    /// `hash as usize % candidates.len()`, which over-weights low
1345    /// indices when `candidates.len()` is not a power of two.
1346    /// With u64 hashes the bias is small but non-zero and
1347    /// sustains a hot-shard skew over time. Lemire's
1348    /// `(hash * len) >> 64` is unbiased.
1349    ///
1350    /// We test the unbiased property by sampling a uniform
1351    /// distribution of u64 hashes over 3 candidate shards and
1352    /// asserting each bucket gets close to 1/3 of the picks.
1353    /// Empirical bound: ±5% across 30 000 trials with a
1354    /// well-distributed input.
1355    #[test]
1356    fn select_shard_distribution_is_unbiased() {
1357        // 3 candidates: a non-power-of-2 to expose the modulo
1358        // bias the fix removes.
1359        let mapper = ShardMapper::new(3, 1024, ScalingPolicy::default()).unwrap();
1360
1361        let trials = 30_000u64;
1362        // Spread inputs uniformly across the u64 range so the
1363        // multiply-shift mapping behaves as designed.
1364        let stride = u64::MAX / trials;
1365        let mut counts = [0u64; 3];
1366        for i in 0..trials {
1367            let h = i.wrapping_mul(stride);
1368            let id = mapper.select_shard(h);
1369            counts[id as usize] += 1;
1370        }
1371
1372        let expected = (trials / 3) as i64;
1373        for (id, &count) in counts.iter().enumerate() {
1374            let diff = (count as i64 - expected).abs();
1375            let pct = (diff as f64 / expected as f64) * 100.0;
1376            assert!(
1377                pct < 5.0,
1378                "shard {} bucket has {} hits ({:.2}% off expected {}); \
1379                 modulo bias would drift higher on certain shards",
1380                id,
1381                count,
1382                pct,
1383                expected
1384            );
1385        }
1386    }
1387
1388    #[test]
1389    fn test_scale_up() {
1390        // Explicitly set max_shards to allow scaling from 2 to 4
1391        let policy = ScalingPolicy {
1392            max_shards: 8,
1393            ..Default::default()
1394        };
1395        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1396
1397        let new_ids = mapper.scale_up(2).unwrap();
1398        assert_eq!(new_ids.len(), 2);
1399        assert_eq!(mapper.active_shard_count(), 4);
1400    }
1401
1402    #[test]
1403    fn test_scale_up_max_limit() {
1404        let policy = ScalingPolicy {
1405            max_shards: 4,
1406            ..Default::default()
1407        };
1408        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1409
1410        let result = mapper.scale_up(1);
1411        assert!(matches!(result, Err(ScalingError::AtMaxShards)));
1412    }
1413
1414    #[test]
1415    fn test_scale_down() {
1416        let policy = ScalingPolicy {
1417            min_shards: 1,
1418            cooldown: Duration::from_nanos(1), // Disable cooldown for test
1419            ..Default::default()
1420        };
1421        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1422
1423        let drained = mapper.scale_down(2).unwrap();
1424        assert_eq!(drained.len(), 2);
1425        assert_eq!(mapper.active_shard_count(), 2);
1426    }
1427
1428    #[test]
1429    fn test_scale_down_min_limit() {
1430        let policy = ScalingPolicy {
1431            min_shards: 4,
1432            ..Default::default()
1433        };
1434        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1435
1436        let result = mapper.scale_down(1);
1437        assert!(matches!(result, Err(ScalingError::AtMinShards)));
1438    }
1439
1440    #[test]
1441    fn test_metrics_collection() {
1442        let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1443
1444        // Record some metrics
1445        if let Some(collector) = mapper.metrics_collector(0) {
1446            collector.record_buffer_len(512);
1447            collector.record_push(5);
1448            collector.record_push(10);
1449        }
1450
1451        let metrics = mapper.collect_metrics();
1452        assert_eq!(metrics.len(), 2);
1453
1454        let shard0_metrics = metrics.iter().find(|m| m.shard_id == 0).unwrap();
1455        assert!(shard0_metrics.fill_ratio > 0.0);
1456    }
1457
1458    /// Regression: a `record_push` / `record_flush` interleaving
1459    /// with a `collect_and_reset` swap must NOT desync `(sum,
1460    /// count)`. Pre-fix `push_latency_sum_ns` and `push_count`
1461    /// were independent atomics; a tick between the two
1462    /// `fetch_add`s captured the sum without the matching count
1463    /// (or the count without the sum). The resulting `avg =
1464    /// sum.checked_div(count).unwrap_or(0)` returned 0 in window
1465    /// N (sum without count) AND 0 in window N+1 (count without
1466    /// sum) — silently zeroing the average that drives
1467    /// `evaluate_scaling`'s push-latency scale-up trigger.
1468    ///
1469    /// Post-fix `(sum, count)` is packed into one
1470    /// `AtomicU64` so the swap captures both atomically. This
1471    /// test fires N concurrent `record_push` calls and a single
1472    /// `collect_and_reset` and asserts the captured count
1473    /// matches the captured sum (i.e. `sum >= count` because
1474    /// every push contributes at least 1 ns; `sum / count` is
1475    /// well-defined for any non-zero count).
1476    #[test]
1477    fn record_push_collect_no_sum_count_desync() {
1478        use std::sync::Barrier;
1479        use std::thread;
1480
1481        let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1482        const PUSHERS: usize = 4;
1483        const PUSHES_PER_THREAD: usize = 1_000;
1484
1485        let barrier = Arc::new(Barrier::new(PUSHERS + 1));
1486        let mut handles = vec![];
1487        for _ in 0..PUSHERS {
1488            let c = collector.clone();
1489            let b = barrier.clone();
1490            handles.push(thread::spawn(move || {
1491                b.wait();
1492                for i in 0..PUSHES_PER_THREAD {
1493                    // Vary latencies so sum/count averages
1494                    // aren't trivially the same number.
1495                    c.record_push((i as u64 % 100) + 1);
1496                }
1497            }));
1498        }
1499
1500        // Race a collect_and_reset across the pushers.
1501        barrier.wait();
1502        let snapshot1 = collector.collect_and_reset();
1503        for h in handles {
1504            h.join().unwrap();
1505        }
1506        // After all threads finish, drain whatever remains.
1507        let snapshot2 = collector.collect_and_reset();
1508
1509        // Reconstruct count and sum from the two snapshots.
1510        // We can't easily expose the packed atomic, so we use
1511        // the per-window averages and event counts as a
1512        // consistency check.
1513        let total_events = snapshot1.event_rate + snapshot2.event_rate;
1514        assert_eq!(
1515            total_events as usize,
1516            PUSHERS * PUSHES_PER_THREAD,
1517            "all pushes must be accounted for"
1518        );
1519
1520        // For each window: if event_rate > 0, avg_push_latency
1521        // must be > 0 (non-zero average) — pre-fix the desync
1522        // could land event_rate > 0 with avg = 0 (count
1523        // captured without sum, or sum without count → div-by-
1524        // zero clamped to 0). This is the directly visible
1525        // symptom of the desync.
1526        //
1527        // events_in_window is incremented separately from the
1528        // packed (sum, count) word, so a strict assertion of
1529        // "event_rate is exactly count" isn't safe — but the
1530        // weaker invariant "if any pushes were captured in the
1531        // (sum,count) word, the average is non-zero" survives
1532        // the packed-atomic fix.
1533        for snap in [&snapshot1, &snapshot2] {
1534            if snap.avg_push_latency_ns == 0 {
1535                // Either no pushes were captured in this
1536                // window, or — pre-fix — sum/count desynced.
1537                // The post-fix shape can only produce
1538                // avg=0 when count is also 0; we can't read
1539                // count directly from ShardMetrics, but
1540                // exercise the invariant by confirming the
1541                // OTHER window's sum is consistent with all
1542                // pushes.
1543                continue;
1544            }
1545            assert!(
1546                snap.avg_push_latency_ns >= 1,
1547                "regression: a window with non-zero avg must have \
1548                 a positive sum (pre-fix sum-without-count desync \
1549                 produced avg=0 with non-zero events)"
1550            );
1551        }
1552    }
1553
1554    #[test]
1555    fn test_draining_excludes_from_selection() {
1556        let policy = ScalingPolicy {
1557            min_shards: 1,
1558            cooldown: Duration::from_nanos(1),
1559            ..Default::default()
1560        };
1561        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1562
1563        // Drain one shard
1564        let drained = mapper.scale_down(1).unwrap();
1565        assert_eq!(drained.len(), 1);
1566
1567        // All selections should go to the remaining active shard
1568        let active_ids = mapper.active_shard_ids();
1569        assert_eq!(active_ids.len(), 1);
1570
1571        for i in 0..100u64 {
1572            let selected = mapper.select_shard(i);
1573            assert!(active_ids.contains(&selected));
1574        }
1575    }
1576
1577    #[test]
1578    fn test_policy_validation() {
1579        let invalid_policy = ScalingPolicy {
1580            fill_ratio_threshold: 1.5, // Invalid
1581            ..Default::default()
1582        };
1583        assert!(invalid_policy.validate().is_err());
1584
1585        // Without normalize(), this would be invalid
1586        let invalid_policy2 = ScalingPolicy {
1587            min_shards: 10,
1588            max_shards: 5,
1589            ..Default::default()
1590        };
1591        assert!(invalid_policy2.validate().is_err());
1592    }
1593
1594    #[test]
1595    fn test_policy_normalize_auto_adjusts_max_shards() {
1596        // When min_shards > max_shards, normalize() should adjust max_shards
1597        let policy = ScalingPolicy {
1598            min_shards: 8,
1599            max_shards: 2, // Less than min_shards
1600            ..Default::default()
1601        };
1602
1603        let normalized = policy.normalize();
1604        assert_eq!(
1605            normalized.max_shards, 8,
1606            "max_shards should be adjusted to min_shards"
1607        );
1608        assert!(
1609            normalized.validate().is_ok(),
1610            "normalized policy should be valid"
1611        );
1612    }
1613
1614    /// Regression: BUG_REPORT.md #7 — `scale_up` previously allocated
1615    /// new shard ids as `shards.iter().max() + 1`, which reused ids
1616    /// after the highest-numbered shard was drained-and-removed.
1617    /// Reusing ids merges two distinct shard lifetimes in any
1618    /// external metric/checkpoint system that keys on shard id.
1619    /// The fix uses a monotonic `next_shard_id` counter.
1620    #[test]
1621    fn scale_up_does_not_reuse_ids_after_remove() {
1622        let policy = ScalingPolicy {
1623            min_shards: 1,
1624            max_shards: 16,
1625            cooldown: Duration::from_nanos(1),
1626            ..Default::default()
1627        };
1628        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1629
1630        // Initial ids are 0 and 1. Scale up to 4 — new ids must be
1631        // 2 and 3 (the next two slots from the monotonic counter).
1632        let new_ids = mapper.scale_up(2).unwrap();
1633        assert_eq!(new_ids, vec![2, 3]);
1634
1635        // Drain one shard. `scale_down` picks by lowest weight, so
1636        // we don't get to choose which id is drained. Whichever it
1637        // is, we then force it through Stopped + remove and check
1638        // that the removed id is *not* reissued by the next
1639        // scale_up.
1640        let drained = mapper.scale_down(1).unwrap();
1641        let drained_id = drained[0];
1642        for shard in mapper.shards.write().iter_mut() {
1643            if shard.id == drained_id {
1644                shard.state = ShardState::Stopped;
1645            }
1646        }
1647        let removed = mapper.remove_stopped_shards();
1648        assert_eq!(removed, vec![drained_id]);
1649
1650        // Scale up by 1. The broken `max(existing_ids) + 1` allocator
1651        // could revive the just-removed id whenever `drained_id` had
1652        // been the highest id present (e.g. id 3 with 0/1/2/3 → 0/1/2
1653        // → next would be 3 again). The fix uses the monotonic
1654        // counter, which sits at 4 after the earlier scale_up, so
1655        // the new id must be 4 regardless of which id was drained.
1656        let new_ids = mapper.scale_up(1).unwrap();
1657        assert_eq!(
1658            new_ids,
1659            vec![4],
1660            "shard id {drained_id} was just removed; reusing any \
1661             previously-issued id would merge two distinct shard \
1662             lifetimes in external systems"
1663        );
1664    }
1665
1666    #[test]
1667    fn test_policy_normalize_preserves_valid_config() {
1668        // When max_shards >= min_shards, normalize() should not change anything
1669        let policy = ScalingPolicy {
1670            min_shards: 4,
1671            max_shards: 16,
1672            ..Default::default()
1673        };
1674
1675        let normalized = policy.normalize();
1676        assert_eq!(normalized.min_shards, 4);
1677        assert_eq!(normalized.max_shards, 16);
1678    }
1679
1680    #[test]
1681    fn test_shard_mapper_normalizes_policy() {
1682        // ShardMapper should accept a policy where min_shards > default max_shards
1683        // because it calls normalize() internally
1684        let policy = ScalingPolicy {
1685            min_shards: 4,
1686            ..Default::default()
1687        };
1688
1689        // This should succeed even on machines with < 4 CPUs
1690        let result = ShardMapper::new(4, 1024, policy);
1691        assert!(
1692            result.is_ok(),
1693            "ShardMapper should normalize policy automatically"
1694        );
1695    }
1696
1697    #[test]
1698    fn test_shard_mapper_adjusts_max_shards_to_initial_count() {
1699        // ShardMapper should adjust max_shards to accommodate initial_shards
1700        // even if initial_shards > default max_shards (CPU count)
1701        let policy = ScalingPolicy::default();
1702
1703        // Create mapper with 8 initial shards - should work even on 2-core machines
1704        let result = ShardMapper::new(8, 1024, policy);
1705        assert!(
1706            result.is_ok(),
1707            "ShardMapper should adjust max_shards to initial_shards"
1708        );
1709
1710        let mapper = result.unwrap();
1711        assert_eq!(mapper.active_shard_count(), 8);
1712
1713        // Verify the policy was adjusted
1714        assert!(
1715            mapper.policy().max_shards >= 8,
1716            "max_shards should be at least initial_shards"
1717        );
1718    }
1719
1720    #[test]
1721    fn test_scale_up_max_shards_concurrent() {
1722        use std::sync::Arc;
1723        use std::thread;
1724
1725        let policy = ScalingPolicy {
1726            max_shards: 10,
1727            cooldown: Duration::from_nanos(1), // Disable cooldown for test
1728            ..Default::default()
1729        };
1730        let mapper = Arc::new(ShardMapper::new(5, 1024, policy).unwrap());
1731
1732        // Spawn multiple threads that all try to scale up
1733        let mut handles = vec![];
1734        for _ in 0..5 {
1735            let mapper_clone = mapper.clone();
1736            handles.push(thread::spawn(move || mapper_clone.scale_up(3)));
1737        }
1738
1739        // Collect results
1740        let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1741
1742        // Some should succeed, some should fail with AtMaxShards
1743        let successes: Vec<_> = results.iter().filter(|r| r.is_ok()).collect();
1744        let failures: Vec<_> = results
1745            .iter()
1746            .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1747            .collect();
1748
1749        // We started with 5, max is 10, each tries to add 3
1750        // At most we can add 5 more shards, so at most 1-2 can succeed
1751        assert!(
1752            !successes.is_empty() || !failures.is_empty(),
1753            "at least some operations should complete"
1754        );
1755
1756        // Final count should not exceed max_shards
1757        assert!(
1758            mapper.active_shard_count() <= 10,
1759            "should never exceed max_shards, got {}",
1760            mapper.active_shard_count()
1761        );
1762    }
1763
1764    /// Multiple `scale_up_provisioning(1)` calls must
1765    /// never push `active_count` past `max_shards` via subsequent
1766    /// `activate()` calls. Pre-fix the budget gate
1767    /// (`check_scale_up_budget`) only counted ALREADY-active
1768    /// shards, so several `scale_up_provisioning` calls could each
1769    /// pass and then each `activate()` unconditionally bumped
1770    /// `active_count` past the cap.
1771    ///
1772    /// Setup: at the budget edge, allocate two Provisioning shards
1773    /// in sequence (both pass the gate because `active_count`
1774    /// hasn't moved). The first `activate()` fills the budget;
1775    /// the second must surface `AtMaxShards` rather than overflow.
1776    #[test]
1777    fn activate_rejects_when_active_count_would_exceed_max_shards() {
1778        let policy = ScalingPolicy {
1779            min_shards: 1,
1780            max_shards: 4,
1781            cooldown: Duration::from_nanos(1),
1782            ..Default::default()
1783        };
1784        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1785        // active_count = 3, max = 4. Allocate TWO Provisioning
1786        // shards; both pass the budget gate (it sees active_count=3).
1787        let ids_a = mapper.scale_up_provisioning(1).unwrap();
1788        // The 1ns cooldown elapses on every realistic scheduler
1789        // tick; if we lose that race, retry. Release-mode and
1790        // some debug-build paths occasionally finish the first
1791        // allocation in <1ns of wall time.
1792        let ids_b = loop {
1793            match mapper.scale_up_provisioning(1) {
1794                Ok(v) => break v,
1795                Err(ScalingError::InCooldown) => {
1796                    std::thread::sleep(Duration::from_nanos(50));
1797                }
1798                Err(e) => panic!("unexpected error from scale_up_provisioning: {:?}", e),
1799            }
1800        };
1801        assert_eq!(ids_a.len(), 1);
1802        assert_eq!(ids_b.len(), 1);
1803
1804        // First activate succeeds (active_count goes 3 → 4 = max).
1805        mapper
1806            .activate(ids_a[0])
1807            .expect("first activate must succeed");
1808        assert_eq!(mapper.active_shard_count(), 4);
1809
1810        // Second activate must REFUSE — it would push past max.
1811        let err = mapper
1812            .activate(ids_b[0])
1813            .expect_err("second activate must reject");
1814        assert!(
1815            matches!(err, ScalingError::AtMaxShards),
1816            "expected AtMaxShards, got {:?}",
1817            err
1818        );
1819        // active_count must still be at the cap, not over it.
1820        assert_eq!(mapper.active_shard_count(), 4);
1821    }
1822
1823    /// Pin: under contention, the active_count never transiently
1824    /// exceeds `max_shards`. Pre-fix `activate` released the
1825    /// shards write-lock BEFORE the `fetch_add(1)`, so two
1826    /// concurrent activators could each pass the budget gate
1827    /// (both reading the pre-bump count) and both bump,
1828    /// overshooting the cap by 1 at any observation point.
1829    #[test]
1830    fn concurrent_activate_never_exceeds_max_shards() {
1831        use std::sync::Arc;
1832        use std::sync::Barrier;
1833        use std::thread;
1834
1835        const ITERATIONS: usize = 200;
1836
1837        for iter in 0..ITERATIONS {
1838            let policy = ScalingPolicy {
1839                min_shards: 1,
1840                max_shards: 4,
1841                cooldown: Duration::from_nanos(1),
1842                ..Default::default()
1843            };
1844            // Start at active=3, allocate two Provisioning ids so
1845            // both threads have a candidate to activate. With the
1846            // pre-fix race: thread A loads count=3, validates,
1847            // sets state Active, drops lock; thread B loads count
1848            // (still 3), validates, sets state Active, drops lock;
1849            // A bumps → 4; B bumps → 5. Post-fix B observes
1850            // count=4 inside its lock and rejects with
1851            // AtMaxShards.
1852            let mapper = Arc::new(ShardMapper::new(3, 1024, policy).unwrap());
1853            let ids_a = mapper.scale_up_provisioning(1).unwrap();
1854            // The 1ns cooldown elapses on every realistic
1855            // scheduler tick; if we lose that race, retry once
1856            // (release-mode iterations occasionally finish the
1857            // first allocation in <1ns of wall time).
1858            let ids_b = loop {
1859                match mapper.scale_up_provisioning(1) {
1860                    Ok(v) => break v,
1861                    Err(ScalingError::InCooldown) => {
1862                        std::thread::sleep(Duration::from_micros(10));
1863                        continue;
1864                    }
1865                    Err(e) => panic!("unexpected error: {:?}", e),
1866                }
1867            };
1868            assert_eq!(ids_a.len(), 1);
1869            assert_eq!(ids_b.len(), 1);
1870
1871            let barrier = Arc::new(Barrier::new(2));
1872            let m1 = mapper.clone();
1873            let m2 = mapper.clone();
1874            let b1 = barrier.clone();
1875            let b2 = barrier.clone();
1876            let id_a = ids_a[0];
1877            let id_b = ids_b[0];
1878
1879            let h1 = thread::spawn(move || {
1880                b1.wait();
1881                m1.activate(id_a)
1882            });
1883            let h2 = thread::spawn(move || {
1884                b2.wait();
1885                m2.activate(id_b)
1886            });
1887            let r1 = h1.join().expect("thread A panicked");
1888            let r2 = h2.join().expect("thread B panicked");
1889
1890            // Exactly one must succeed and one must reject with
1891            // AtMaxShards.
1892            let ok_count = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
1893            let at_max_count = [&r1, &r2]
1894                .iter()
1895                .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1896                .count();
1897            assert_eq!(
1898                ok_count, 1,
1899                "iter {}: expected exactly one Ok, got r1={:?} r2={:?}",
1900                iter, r1, r2
1901            );
1902            assert_eq!(
1903                at_max_count, 1,
1904                "iter {}: expected exactly one AtMaxShards, got r1={:?} r2={:?}",
1905                iter, r1, r2
1906            );
1907
1908            // active_count must not exceed max_shards at any
1909            // observation point.
1910            assert!(
1911                mapper.active_shard_count() <= 4,
1912                "iter {}: active_count={} exceeded max_shards=4",
1913                iter,
1914                mapper.active_shard_count(),
1915            );
1916        }
1917    }
1918
1919    /// Two concurrent `scale_up(1)` calls must never both succeed
1920    /// inside a single cooldown window. Before the fix, the
1921    /// cooldown check happened only under a read lock that was
1922    /// released *before* the mutating write lock, so two threads
1923    /// could both observe `last_scaling=None` (or stale), both
1924    /// acquire the write lock in turn, and both succeed — racing
1925    /// past the cooldown floor and (on a max-shard-bounded
1926    /// scenario) potentially the `max_shards` cap.
1927    ///
1928    /// Pin: across `ITERATIONS` rounds of two-thread races, every
1929    /// iteration sees exactly one success and one `InCooldown`.
1930    #[test]
1931    fn cooldown_is_enforced_under_concurrent_scale_up() {
1932        use std::sync::Arc;
1933        use std::sync::Barrier;
1934        use std::thread;
1935
1936        const ITERATIONS: usize = 1_000;
1937
1938        for iter in 0..ITERATIONS {
1939            let policy = ScalingPolicy {
1940                min_shards: 1,
1941                max_shards: 16,
1942                // Large cooldown so a single iteration can't pass
1943                // the floor between the two threads' calls.
1944                cooldown: Duration::from_secs(60),
1945                ..Default::default()
1946            };
1947            let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
1948            let barrier = Arc::new(Barrier::new(2));
1949
1950            let m1 = mapper.clone();
1951            let b1 = barrier.clone();
1952            let m2 = mapper.clone();
1953            let b2 = barrier.clone();
1954
1955            let h1 = thread::spawn(move || {
1956                b1.wait();
1957                m1.scale_up(1)
1958            });
1959            let h2 = thread::spawn(move || {
1960                b2.wait();
1961                m2.scale_up(1)
1962            });
1963
1964            let r1 = h1.join().unwrap();
1965            let r2 = h2.join().unwrap();
1966
1967            let oks = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
1968            let cooldowns = [&r1, &r2]
1969                .iter()
1970                .filter(|r| matches!(r, Err(ScalingError::InCooldown)))
1971                .count();
1972
1973            assert_eq!(
1974                oks, 1,
1975                "iter {iter}: expected exactly one Ok, got r1={r1:?}, r2={r2:?}"
1976            );
1977            assert_eq!(
1978                cooldowns, 1,
1979                "iter {iter}: expected exactly one InCooldown, got r1={r1:?}, r2={r2:?}"
1980            );
1981            assert_eq!(
1982                mapper.active_shard_count(),
1983                3,
1984                "iter {iter}: cooldown violated — both calls mutated state (shard count {})",
1985                mapper.active_shard_count()
1986            );
1987        }
1988    }
1989
1990    #[test]
1991    fn test_scale_up_overflow_protection() {
1992        // Create mapper with a high starting shard ID to test overflow
1993        // protection. The monotonic id allocator advances by 1 per
1994        // shard regardless of which shards have been removed, so we
1995        // bump `next_shard_id` directly to simulate a near-`u16::MAX`
1996        // state.
1997        let policy = ScalingPolicy {
1998            max_shards: u16::MAX,
1999            ..Default::default()
2000        };
2001        let mapper = ShardMapper::new(1, 1024, policy).unwrap();
2002
2003        // Position the allocator so the next id is u16::MAX - 1.
2004        // Trying to allocate 3 ids would need {MAX-1, MAX, MAX+1};
2005        // the last is unrepresentable in `u16`, so scale_up rejects.
2006        mapper
2007            .next_shard_id
2008            .store(u16::MAX - 1, AtomicOrdering::Relaxed);
2009
2010        let result = mapper.scale_up(3);
2011        assert!(matches!(result, Err(ScalingError::AtMaxShards)));
2012
2013        // Adding 1 shard should still work (id = MAX - 1).
2014        let result = mapper.scale_up(1);
2015        assert!(result.is_ok());
2016    }
2017
2018    #[test]
2019    fn test_evaluate_scaling_auto_scale_disabled() {
2020        let policy = ScalingPolicy {
2021            auto_scale: false,
2022            ..Default::default()
2023        };
2024        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2025
2026        let decision = mapper.evaluate_scaling();
2027        assert!(matches!(decision, ScalingDecision::None));
2028    }
2029
2030    #[test]
2031    fn test_evaluate_scaling_in_cooldown() {
2032        let policy = ScalingPolicy {
2033            cooldown: Duration::from_secs(60),
2034            ..Default::default()
2035        };
2036        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2037
2038        // Trigger a scaling operation to start cooldown
2039        *mapper.last_scaling.write() = Some(Instant::now());
2040
2041        let decision = mapper.evaluate_scaling();
2042        assert!(matches!(decision, ScalingDecision::None));
2043    }
2044
2045    #[test]
2046    fn test_evaluate_scaling_scale_up_on_high_fill_ratio() {
2047        let policy = ScalingPolicy {
2048            fill_ratio_threshold: 0.7,
2049            max_shards: 8,
2050            cooldown: Duration::from_nanos(1),
2051            ..Default::default()
2052        };
2053        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2054
2055        // Set high fill ratio on majority of shards
2056        {
2057            let mut shards = mapper.shards.write();
2058            for shard in shards.iter_mut() {
2059                shard.last_metrics.fill_ratio = 0.9; // Above threshold
2060            }
2061        }
2062
2063        let decision = mapper.evaluate_scaling();
2064        assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2065    }
2066
2067    #[test]
2068    fn test_evaluate_scaling_scale_up_on_high_latency() {
2069        let policy = ScalingPolicy {
2070            push_latency_threshold_ns: 10,
2071            max_shards: 8,
2072            cooldown: Duration::from_nanos(1),
2073            ..Default::default()
2074        };
2075        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2076
2077        // Set high push latency on majority of shards
2078        {
2079            let mut shards = mapper.shards.write();
2080            for shard in shards.iter_mut() {
2081                shard.last_metrics.avg_push_latency_ns = 100; // Above threshold
2082            }
2083        }
2084
2085        let decision = mapper.evaluate_scaling();
2086        assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2087    }
2088
2089    #[test]
2090    fn test_evaluate_scaling_scale_down_on_underutilized() {
2091        let policy = ScalingPolicy {
2092            underutilized_threshold: 0.2,
2093            min_shards: 2,
2094            cooldown: Duration::from_nanos(1),
2095            ..Default::default()
2096        };
2097        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2098
2099        // Set low fill ratio and zero event rate on majority of shards
2100        {
2101            let mut shards = mapper.shards.write();
2102            for shard in shards.iter_mut() {
2103                shard.last_metrics.fill_ratio = 0.05; // Below threshold
2104                shard.last_metrics.event_rate = 0;
2105            }
2106        }
2107
2108        let decision = mapper.evaluate_scaling();
2109        assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
2110    }
2111
2112    /// Regression: a freshly-activated shard must NOT
2113    /// immediately count toward the underutilized tally on the
2114    /// next `evaluate_scaling`. Pre-fix the new shard's
2115    /// `last_metrics` was the `ShardMetrics::new(id)` placeholder
2116    /// (`fill_ratio = 0.0, event_rate = 0`), which matched the
2117    /// underutilized trigger immediately — oscillating the
2118    /// system: scale-up → next tick scale-down → next tick
2119    /// scale-up.
2120    ///
2121    /// Post-fix `MappedShard.activated_at` is stamped at the
2122    /// Provisioning → Active transition (and at scale-up
2123    /// construction for `Active`-from-create shards), and
2124    /// `evaluate_scaling` skips shards within `policy.cooldown`
2125    /// of activation.
2126    #[test]
2127    fn freshly_added_shard_skipped_from_evaluate_scaling_warmup() {
2128        let policy = ScalingPolicy {
2129            underutilized_threshold: 0.2,
2130            min_shards: 1,
2131            // Long cooldown so the warmup window stays open
2132            // throughout the test.
2133            cooldown: Duration::from_secs(60),
2134            ..Default::default()
2135        };
2136        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2137
2138        // Direct manipulation of the shard list: pin two boot
2139        // shards as underutilized (boot stamps put them outside
2140        // the warmup window), and inject one freshly-activated
2141        // shard with `activated_at = now()` whose placeholder
2142        // metrics ALSO trigger the underutilized predicate.
2143        // Without the warmup skip, the count would be 3 of 3
2144        // shards underutilized → scale-down. WITH the warmup
2145        // skip, only the 2 boot shards count, and 2 of 3 still
2146        // exceeds the 3/2 = 1 majority — scale-down still fires
2147        // (driven by the boot shards), but the decisive property
2148        // is that the fresh shard is correctly excluded.
2149        {
2150            let mut shards = mapper.shards.write();
2151            for shard in shards.iter_mut() {
2152                shard.last_metrics.fill_ratio = 0.05;
2153                shard.last_metrics.event_rate = 0;
2154            }
2155            // The third shard is "fresh": stamp activated_at to
2156            // now, simulating a just-activated shard.
2157            shards[2].activated_at = Instant::now();
2158        }
2159
2160        // The fresh shard must satisfy the warmup predicate.
2161        let now = Instant::now();
2162        let warmup_excluded: Vec<u16> = mapper
2163            .shards
2164            .read()
2165            .iter()
2166            .filter(|s| now.duration_since(s.activated_at) < mapper.policy.cooldown)
2167            .map(|s| s.id)
2168            .collect();
2169        assert_eq!(
2170            warmup_excluded,
2171            vec![2u16],
2172            "regression: only shard id 2 (just-stamped) should be \
2173             within the warmup window; boot shards are stamped \
2174             1 hour in the past"
2175        );
2176
2177        // And evaluate_scaling must still produce ScaleDown
2178        // (driven by the 2 boot shards) — the fresh shard's
2179        // placeholder doesn't get to vote.
2180        let decision = mapper.evaluate_scaling();
2181        assert!(
2182            matches!(decision, ScalingDecision::ScaleDown(_)),
2183            "scale-down still fires from the boot shards' real \
2184             underutilization, but driven by 2 of 3 not 3 of 3 — \
2185             got {:?}",
2186            decision,
2187        );
2188    }
2189
2190    #[test]
2191    fn test_evaluate_scaling_no_scale_up_at_max() {
2192        let policy = ScalingPolicy {
2193            fill_ratio_threshold: 0.7,
2194            max_shards: 4, // Already at max
2195            cooldown: Duration::from_nanos(1),
2196            ..Default::default()
2197        };
2198        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2199
2200        // Set high fill ratio
2201        {
2202            let mut shards = mapper.shards.write();
2203            for shard in shards.iter_mut() {
2204                shard.last_metrics.fill_ratio = 0.9;
2205            }
2206        }
2207
2208        let decision = mapper.evaluate_scaling();
2209        assert!(matches!(decision, ScalingDecision::None));
2210    }
2211
2212    #[test]
2213    fn test_evaluate_scaling_no_scale_down_at_min() {
2214        let policy = ScalingPolicy {
2215            underutilized_threshold: 0.2,
2216            min_shards: 4, // Already at min
2217            cooldown: Duration::from_nanos(1),
2218            ..Default::default()
2219        };
2220        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2221
2222        // Set underutilized metrics
2223        {
2224            let mut shards = mapper.shards.write();
2225            for shard in shards.iter_mut() {
2226                shard.last_metrics.fill_ratio = 0.05;
2227                shard.last_metrics.event_rate = 0;
2228            }
2229        }
2230
2231        let decision = mapper.evaluate_scaling();
2232        assert!(matches!(decision, ScalingDecision::None));
2233    }
2234
2235    #[test]
2236    fn test_evaluate_scaling_ignores_draining_shards() {
2237        let policy = ScalingPolicy {
2238            fill_ratio_threshold: 0.7,
2239            max_shards: 8,
2240            cooldown: Duration::from_nanos(1),
2241            ..Default::default()
2242        };
2243        let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2244
2245        // Set high fill ratio but mark shards as draining
2246        {
2247            let mut shards = mapper.shards.write();
2248            for shard in shards.iter_mut() {
2249                shard.last_metrics.fill_ratio = 0.9;
2250                shard.state = ShardState::Draining;
2251            }
2252        }
2253
2254        // Should not scale up since draining shards are ignored
2255        let decision = mapper.evaluate_scaling();
2256        assert!(matches!(decision, ScalingDecision::None));
2257    }
2258
2259    #[test]
2260    fn test_shard_metrics_new() {
2261        let metrics = ShardMetrics::new(5);
2262        assert_eq!(metrics.shard_id, 5);
2263        assert_eq!(metrics.fill_ratio, 0.0);
2264        assert_eq!(metrics.event_rate, 0);
2265        assert!(!metrics.draining);
2266    }
2267
2268    #[test]
2269    fn test_shard_metrics_compute_weight() {
2270        let mut metrics = ShardMetrics::new(0);
2271        metrics.fill_ratio = 0.5;
2272        metrics.avg_push_latency_ns = 100;
2273        metrics.event_rate = 1_000_000;
2274
2275        metrics.compute_weight();
2276        assert!(metrics.weight > 0.0);
2277    }
2278
2279    #[test]
2280    fn test_shard_metrics_draining_max_weight() {
2281        let mut metrics = ShardMetrics::new(0);
2282        metrics.draining = true;
2283        metrics.compute_weight();
2284        assert_eq!(metrics.weight, f64::MAX);
2285    }
2286
2287    #[test]
2288    fn test_scaling_decision_debug() {
2289        let none = ScalingDecision::None;
2290        let up = ScalingDecision::ScaleUp(2);
2291        let down = ScalingDecision::ScaleDown(1);
2292
2293        assert!(format!("{:?}", none).contains("None"));
2294        assert!(format!("{:?}", up).contains("ScaleUp"));
2295        assert!(format!("{:?}", down).contains("ScaleDown"));
2296    }
2297
2298    /// Regression: BUG_REPORT.md #46 — `scale_up_provisioning`
2299    /// allocates a shard but `select_shard` must NOT route to it
2300    /// until `activate` has been called. This is the load-bearing
2301    /// invariant that lets `EventBus::add_shard_internal` wire up
2302    /// drain workers before producers can land in the new ring
2303    /// buffer.
2304    #[test]
2305    fn provisioning_shard_is_not_selectable_until_activated() {
2306        let policy = ScalingPolicy {
2307            min_shards: 1,
2308            max_shards: 16,
2309            cooldown: Duration::from_nanos(1),
2310            ..Default::default()
2311        };
2312        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2313
2314        // Allocate a provisioning shard. Existing ids are 0,1; new
2315        // is 2.
2316        let new_ids = mapper.scale_up_provisioning(1).unwrap();
2317        assert_eq!(new_ids, vec![2]);
2318
2319        // The provisioning shard must NOT appear in active accounting.
2320        assert_eq!(mapper.active_shard_count(), 2);
2321        assert_eq!(mapper.shard_state(2), Some(ShardState::Provisioning));
2322
2323        // Across many hashes, `select_shard` must never pick id 2.
2324        // Spread the input hashes across the u64 range so the
2325        // unbiased Lemire-style mapping actually
2326        // distributes — sequential small ids would all map to
2327        // index 0 because `(small * len) >> 64 = 0`. Production
2328        // callers pass `xxh3_64`-hashed event payloads, which
2329        // are uniform u64s; this scaling mirrors that.
2330        let mut seen: std::collections::HashSet<u16> = std::collections::HashSet::new();
2331        let stride = u64::MAX / 10_000;
2332        for i in 0u64..10_000 {
2333            seen.insert(mapper.select_shard(i.wrapping_mul(stride)));
2334        }
2335        assert!(
2336            !seen.contains(&2),
2337            "provisioning shard 2 was selected — \
2338             producers would push into a ring buffer with no consumer (#46)"
2339        );
2340        assert!(seen == [0, 1].into_iter().collect());
2341
2342        // After `activate`, the shard is selectable.
2343        mapper.activate(2).unwrap();
2344        assert_eq!(mapper.shard_state(2), Some(ShardState::Active));
2345        assert_eq!(mapper.active_shard_count(), 3);
2346
2347        let mut seen_after: std::collections::HashSet<u16> = std::collections::HashSet::new();
2348        let stride = u64::MAX / 10_000;
2349        for i in 0u64..10_000 {
2350            seen_after.insert(mapper.select_shard(i.wrapping_mul(stride)));
2351        }
2352        assert!(
2353            seen_after.contains(&2),
2354            "after activate, shard 2 should be a valid select_shard target"
2355        );
2356    }
2357
2358    /// Regression: BUG_REPORT.md #51 — when no `Active` shard exists
2359    /// (e.g. all shards are Draining or Provisioning), `select_shard`
2360    /// must NOT fall back to a Draining shard. Pushing into a
2361    /// draining shard increments `pushes_since_drain_start` and
2362    /// blocks finalization indefinitely.
2363    #[test]
2364    fn select_shard_does_not_fall_back_to_draining() {
2365        let policy = ScalingPolicy {
2366            min_shards: 0,
2367            max_shards: 8,
2368            cooldown: Duration::from_nanos(1),
2369            ..Default::default()
2370        };
2371        // Force min_shards = 0 via direct construction so we can drain
2372        // every shard.
2373        let mut policy = policy;
2374        policy.min_shards = 1;
2375        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2376
2377        // Force every shard into Draining state directly. We bypass
2378        // `scale_down`'s min_shards floor by mutating the test-only
2379        // accessor; this models a state that can otherwise be
2380        // reached by `drain_specific` calls.
2381        {
2382            let mut shards = mapper.shards.write();
2383            for s in shards.iter_mut() {
2384                s.state = ShardState::Draining;
2385                s.metrics.set_draining(true);
2386            }
2387        }
2388
2389        // The fallback must return the OOB sentinel `u16::MAX` rather
2390        // than a draining shard id (0 or 1). The upstream
2391        // `resolve_idx` path will see no match for `u16::MAX` and
2392        // surface as `Unrouted` (#44), which is the correct signal:
2393        // "no destination, do not push."
2394        for hash in 0u64..1_000 {
2395            let picked = mapper.select_shard(hash);
2396            assert_eq!(
2397                picked,
2398                u16::MAX,
2399                "fallback returned a draining shard ({}); pushes there would \
2400                 deadlock finalize_draining (#51)",
2401                picked
2402            );
2403        }
2404    }
2405
2406    /// Regression: BUG_REPORT.md #32 — `scale_up(0)` previously
2407    /// bumped the cooldown timestamp and could spuriously fail at
2408    /// `u16::MAX` even though zero ids were being allocated.
2409    #[test]
2410    fn scale_up_zero_is_a_noop() {
2411        let policy = ScalingPolicy {
2412            cooldown: Duration::from_secs(60),
2413            ..Default::default()
2414        };
2415        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2416        // Pretend we just scaled — cooldown is active.
2417        *mapper.last_scaling.write() = Some(Instant::now());
2418
2419        // scale_up(0) must not return InCooldown and must not bump
2420        // last_scaling.
2421        let before_ts = *mapper.last_scaling.read();
2422        let r = mapper.scale_up(0);
2423        assert!(
2424            r.is_ok(),
2425            "scale_up(0) should succeed as a no-op, got {r:?}"
2426        );
2427        assert_eq!(r.unwrap().len(), 0);
2428        let after_ts = *mapper.last_scaling.read();
2429        assert_eq!(before_ts, after_ts, "scale_up(0) bumped cooldown timestamp");
2430
2431        // Also: position the allocator at u16::MAX and verify a
2432        // count==0 call doesn't trip the sentinel check.
2433        mapper
2434            .next_shard_id
2435            .store(u16::MAX, AtomicOrdering::Relaxed);
2436        assert!(mapper.scale_up(0).is_ok());
2437    }
2438
2439    /// Regression: `drain_specific` must bump `last_scaling` so
2440    /// a subsequent `scale_up` is gated by the cooldown floor.
2441    /// Pre-fix `scale_down` wrote `last_scaling` but
2442    /// `drain_specific` did not, so the sequence
2443    /// `drain_specific(id) → scale_up(N)` bypassed the cooldown
2444    /// — even though both decrement `active_count` and so
2445    /// should be symmetric from the budget-math perspective.
2446    #[test]
2447    fn drain_specific_bumps_last_scaling_so_scale_up_respects_cooldown() {
2448        let policy = ScalingPolicy {
2449            min_shards: 1,
2450            max_shards: 8,
2451            // A cooldown long enough that `Instant::now()` won't
2452            // accidentally elapse it during the test, but short
2453            // enough not to slow CI.
2454            cooldown: Duration::from_secs(60),
2455            ..Default::default()
2456        };
2457        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2458
2459        // Pre-condition: no prior scaling action.
2460        assert!(mapper.last_scaling.read().is_none());
2461
2462        let before = Instant::now();
2463        mapper.drain_specific(0).unwrap();
2464        let after_ts = mapper
2465            .last_scaling
2466            .read()
2467            .expect("drain_specific must record a `last_scaling` timestamp");
2468        // Sanity: the recorded timestamp is in the test window.
2469        assert!(
2470            after_ts >= before,
2471            "last_scaling must be bumped to a current Instant (got {:?}, before was {:?})",
2472            after_ts,
2473            before
2474        );
2475
2476        // The decisive sealed property: a follow-up scale_up
2477        // must trip the cooldown gate. Pre-fix this would have
2478        // succeeded immediately because `last_scaling` was never
2479        // written by `drain_specific`.
2480        let err = mapper
2481            .scale_up(1)
2482            .expect_err("scale_up immediately after drain_specific must hit cooldown");
2483        match err {
2484            ScalingError::InCooldown => {} // expected
2485            other => panic!("expected InCooldown after drain_specific, got {:?}", other),
2486        }
2487    }
2488
2489    /// Regression: BUG_REPORT.md #48 — `drain_specific` must
2490    /// transition the shard's `MappedShard.state` to `Draining` so
2491    /// that `select_shard` stops routing to it. The previous
2492    /// `drain_shard` only flipped a metrics atomic.
2493    #[test]
2494    fn drain_specific_takes_shard_out_of_select() {
2495        let policy = ScalingPolicy {
2496            min_shards: 1,
2497            max_shards: 8,
2498            cooldown: Duration::from_nanos(1),
2499            ..Default::default()
2500        };
2501        let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2502
2503        // Drain shard 0.
2504        mapper.drain_specific(0).unwrap();
2505        assert_eq!(mapper.shard_state(0), Some(ShardState::Draining));
2506        assert_eq!(mapper.active_shard_count(), 2);
2507
2508        // Across many hashes, select_shard must not pick id 0.
2509        for hash in 0u64..10_000 {
2510            let picked = mapper.select_shard(hash);
2511            assert_ne!(
2512                picked, 0,
2513                "select_shard returned a Draining shard id 0 — \
2514                 producer pushes there would block finalize_draining (#48)"
2515            );
2516        }
2517    }
2518
2519    /// Regression: BUG_REPORT.md #33 — `set_draining(true)` and a
2520    /// concurrent `record_push` race on `pushes_since_drain_start`.
2521    /// The race itself can't be eliminated without a CAS loop on
2522    /// the counter (which would penalize the hot path), so the
2523    /// best we can pin is: after the dust settles, the counter
2524    /// value is bounded — never larger than the number of pushes
2525    /// that genuinely overlapped the transition. This catches a
2526    /// regression where the store-zero stops happening, where the
2527    /// flag publish stops happening, or where future code adds
2528    /// drift that compounds the race across many transitions.
2529    #[test]
2530    fn set_draining_resets_counter_under_concurrent_pushes() {
2531        use std::sync::Barrier;
2532        use std::thread;
2533
2534        const ITERATIONS: usize = 200;
2535        const PUSHERS: usize = 4;
2536
2537        for _ in 0..ITERATIONS {
2538            let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
2539
2540            // Sanity: counter starts at zero.
2541            assert_eq!(collector.pushes_since_drain_start(), 0);
2542
2543            // Pre-load with a noticeable amount of "before drain"
2544            // pushes so the reset has work to do.
2545            for _ in 0..50 {
2546                collector.record_push(1);
2547            }
2548            assert_eq!(collector.pushes_since_drain_start(), 50);
2549
2550            // Race: every pusher hammers `record_push` while one
2551            // thread calls `set_draining(true)`. After the barrier,
2552            // we want to observe that the counter ends up bounded
2553            // by PUSHERS (the number of pushes that genuinely
2554            // overlapped the transition) — and crucially NOT 50+,
2555            // which is what the buggy code with a missing reset
2556            // would leave behind.
2557            let barrier = Arc::new(Barrier::new(PUSHERS + 1));
2558            let mut handles = Vec::with_capacity(PUSHERS);
2559            for _ in 0..PUSHERS {
2560                let c = Arc::clone(&collector);
2561                let b = Arc::clone(&barrier);
2562                handles.push(thread::spawn(move || {
2563                    b.wait();
2564                    c.record_push(1);
2565                }));
2566            }
2567
2568            barrier.wait();
2569            collector.set_draining(true);
2570            for h in handles {
2571                h.join().unwrap();
2572            }
2573
2574            // After reset + at-most-PUSHERS racing pushes, the
2575            // counter is bounded. The strong guarantee we want is
2576            // "the 50 pre-drain pushes did NOT survive the reset."
2577            // Anything <= PUSHERS is acceptable — the race may
2578            // count any subset of the racing pushes.
2579            let final_count = collector.pushes_since_drain_start();
2580            assert!(
2581                final_count <= PUSHERS as u64,
2582                "set_draining reset is broken: counter is {} after reset, \
2583                 expected at most {} (#33)",
2584                final_count,
2585                PUSHERS
2586            );
2587        }
2588    }
2589
2590    /// Regression: BUG_REPORT.md #49 — `finalize_draining` must
2591    /// drop the `shards` write lock before calling `on_shard_removed`,
2592    /// so a callback that re-enters the mapper (read methods like
2593    /// `shard_state`, `active_shard_ids`, etc.) does not deadlock.
2594    #[test]
2595    fn finalize_draining_does_not_deadlock_on_callback_reentry() {
2596        use std::sync::Mutex;
2597        let policy = ScalingPolicy {
2598            min_shards: 1,
2599            max_shards: 8,
2600            cooldown: Duration::from_nanos(1),
2601            ..Default::default()
2602        };
2603        let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2604
2605        // Set a callback that re-enters the mapper. Before the fix
2606        // this acquires `shards.read()` while finalize_draining
2607        // still holds `shards.write()`, deadlocking on parking_lot's
2608        // non-recursive RwLock.
2609        type Observation = (u16, Option<ShardState>);
2610        let observed_states: Arc<Mutex<Vec<Observation>>> = Arc::new(Mutex::new(Vec::new()));
2611        {
2612            let mapper_for_cb = Arc::clone(&mapper);
2613            let observed = Arc::clone(&observed_states);
2614            mapper.set_on_shard_removed(move |id| {
2615                let st = mapper_for_cb.shard_state(id);
2616                observed.lock().unwrap().push((id, st));
2617            });
2618        }
2619
2620        // Drive a shard all the way to Stopped: drain it, mark its
2621        // metrics empty (current_len = 0), and drop drain_started
2622        // far enough back that the 100ms gate is satisfied.
2623        mapper.drain_specific(0).unwrap();
2624        {
2625            let mut shards = mapper.shards.write();
2626            let s = shards.iter_mut().find(|s| s.id == 0).unwrap();
2627            s.metrics.current_len.store(0, AtomicOrdering::Relaxed);
2628            s.metrics
2629                .pushes_since_drain_start
2630                .store(0, AtomicOrdering::Relaxed);
2631            // Backdate drain_started so the elapsed > 100ms gate trips.
2632            s.drain_started = Some(Instant::now() - Duration::from_secs(1));
2633        }
2634
2635        // The call below would deadlock with the bug present.
2636        let stopped = mapper.finalize_draining();
2637        assert_eq!(stopped, vec![0]);
2638
2639        // The callback must have run AND been able to read state
2640        // (proving the lock was released).
2641        let observed = observed_states.lock().unwrap().clone();
2642        assert_eq!(observed.len(), 1);
2643        assert_eq!(observed[0].0, 0);
2644        // State should be `Stopped` (set by finalize_draining before
2645        // the lock was dropped).
2646        assert_eq!(observed[0].1, Some(ShardState::Stopped));
2647    }
2648
2649    /// `activate` must signal whether a state transition
2650    /// actually occurred so callers can avoid double-counting.
2651    /// Pre-fix it returned `Result<(), _>`, so a caller that
2652    /// invoked `activate` twice on the same shard incremented its
2653    /// own external counter (e.g. `ShardManager::num_shards`)
2654    /// twice for one logical transition.
2655    #[test]
2656    fn activate_returns_true_on_transition_and_false_on_idempotent_call() {
2657        let policy = ScalingPolicy {
2658            min_shards: 1,
2659            max_shards: 16,
2660            cooldown: Duration::from_nanos(1),
2661            ..Default::default()
2662        };
2663        let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2664
2665        // Newly-provisioned shard 2 → Active: real transition.
2666        let new_ids = mapper.scale_up_provisioning(1).unwrap();
2667        assert_eq!(new_ids, vec![2]);
2668        let first = mapper.activate(2).unwrap();
2669        assert!(
2670            first,
2671            "first activate on a Provisioning shard must return true"
2672        );
2673        assert_eq!(mapper.active_shard_count(), 3);
2674
2675        // Second activate on the same already-Active shard:
2676        // idempotent, no transition.
2677        let second = mapper.activate(2).unwrap();
2678        assert!(
2679            !second,
2680            "activate on an already-Active shard must return false; \
2681             pre-fix this returned Ok(()) and the caller couldn't tell"
2682        );
2683        // active_count must NOT have moved.
2684        assert_eq!(
2685            mapper.active_shard_count(),
2686            3,
2687            "idempotent activate must not bump active_count"
2688        );
2689    }
2690}