net/shard/mapper.rs
1//! Dynamic shard mapping and scaling.
2//!
3//! This module implements dynamic shard scaling following Approach B from
4//! DYNAMIC_SHARD_SCALING.md: increasing the number of shards and rebalancing
5//! producers across them, while maintaining SPSC (single-producer single-consumer)
6//! semantics per shard.
7//!
8//! # Core Principles
9//!
10//! - Shards never accept multiple producers
11//! - Producers get moved to new shards when load increases
12//! - Ingestion performance stays at 700M ops/sec per shard
13//! - Ordering guarantees remain intact within a shard
14//! - Total throughput scales linearly with shard count
15//!
16//! # Scaling Triggers
17//!
18//! - Ring buffer fill ratio > threshold (default 70%)
19//! - Push latency exceeds threshold (default 5ns)
20//! - Batch flush latency exceeds threshold
21//! - Session/producer count growth
22//!
23//! # Architecture
24//!
25//! ```text
26//! Producers -----+
27//! |
28//! v
29//! +----------------------------+
30//! | Dynamic Shard Mapper |
31//! +----------------------------+
32//! | | |
33//! v v v
34//! Shard 0 Shard 1 Shard 2 …
35//! ```
36
37use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering as AtomicOrdering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use arc_swap::ArcSwap;
42use parking_lot::RwLock;
43
44use crate::config::ScalingPolicy;
45
46/// Metrics for a single shard used for scaling decisions.
47#[derive(Debug, Clone)]
48pub struct ShardMetrics {
49 /// Shard identifier.
50 pub shard_id: u16,
51 /// Current fill ratio (0.0 - 1.0).
52 pub fill_ratio: f64,
53 /// Events ingested in the current window.
54 pub event_rate: u64,
55 /// Average push latency in nanoseconds.
56 pub avg_push_latency_ns: u64,
57 /// Average batch flush latency in microseconds.
58 pub avg_flush_latency_us: u64,
59 /// Whether this shard is in drain mode.
60 pub draining: bool,
61 /// Computed weight for load balancing (lower = less loaded).
62 pub weight: f64,
63 /// Last update timestamp.
64 pub last_updated: Instant,
65}
66
67impl ShardMetrics {
68 /// Create new metrics for a shard.
69 pub fn new(shard_id: u16) -> Self {
70 Self {
71 shard_id,
72 fill_ratio: 0.0,
73 event_rate: 0,
74 avg_push_latency_ns: 0,
75 avg_flush_latency_us: 0,
76 draining: false,
77 weight: 0.0,
78 last_updated: Instant::now(),
79 }
80 }
81
82 /// Compute the weight based on current metrics.
83 /// Lower weight = better candidate for new producers.
84 pub fn compute_weight(&mut self) {
85 // Weight formula: combines fill ratio, latency, and event rate
86 // Higher fill ratio = higher weight (avoid overloaded shards)
87 // Higher latency = higher weight
88 // Higher event rate = higher weight
89 let fill_weight = self.fill_ratio * 100.0;
90 let latency_weight = (self.avg_push_latency_ns as f64) / 10.0;
91 let rate_weight = (self.event_rate as f64) / 1_000_000.0;
92
93 self.weight = fill_weight + latency_weight + rate_weight;
94
95 // Draining shards get maximum weight (never assign new producers)
96 if self.draining {
97 self.weight = f64::MAX;
98 }
99 }
100}
101
102/// Live metrics collector for a shard (atomics for lock-free updates).
103#[derive(Debug)]
104pub struct ShardMetricsCollector {
105 /// Shard identifier.
106 shard_id: u16,
107 /// Ring buffer capacity.
108 capacity: usize,
109 /// Current buffer length (updated by shard).
110 current_len: AtomicU64,
111 /// Events ingested in current window.
112 events_in_window: AtomicU64,
113 /// Packed `(count << 32) | sum` for push latencies (ns).
114 /// Pre-fix `push_latency_sum_ns` and `push_count` were
115 /// independent `AtomicU64`s. `record_push` did two separate
116 /// `fetch_add`s, and `collect_and_reset` did two separate
117 /// `swap`s. A metrics tick interleaving between the two
118 /// `fetch_add`s captured the sum WITHOUT the count (or
119 /// vice versa); the resulting `avg = sum.checked_div(count)
120 /// .unwrap_or(0)` returned 0 in window N (sum without
121 /// count) and 0 in window N+1 (count without sum), silently
122 /// zeroing the average that drives `evaluate_scaling`'s
123 /// push-latency scale-up trigger. Packing into one u64 makes
124 /// the `(sum, count)` update atomic; the upper 32 bits hold
125 /// the count (u32::MAX = 4G calls/window — plenty) and the
126 /// lower 32 hold the sum (u32::MAX = 4 G ns ≈ 4 s, also
127 /// plenty for any sane window).
128 push_latency: AtomicU64,
129 /// Packed `(count << 32) | sum` for flush latencies (us).
130 /// Same shape and rationale as `push_latency`. The lower 32
131 /// bits hold sum-µs (u32::MAX ≈ 4 Gµs ≈ 67 minutes — far
132 /// past any plausible window).
133 flush_latency: AtomicU64,
134 /// Whether this shard is draining.
135 draining: AtomicBool,
136 /// Window start time.
137 window_start: RwLock<Instant>,
138 /// Pushes observed since `set_draining(true)` was last called.
139 /// Distinct from `events_in_window` because this counter is NOT
140 /// reset by `collect_and_reset`. `finalize_draining` reads this
141 /// instead of `events_in_window` so a drain-window-overlap with
142 /// a metrics tick can no longer race the counter to zero before
143 /// the producer is observed.
144 pushes_since_drain_start: AtomicU64,
145}
146
147impl ShardMetricsCollector {
148 /// Create a new metrics collector.
149 pub fn new(shard_id: u16, capacity: usize) -> Self {
150 Self {
151 shard_id,
152 capacity,
153 current_len: AtomicU64::new(0),
154 events_in_window: AtomicU64::new(0),
155 push_latency: AtomicU64::new(0),
156 flush_latency: AtomicU64::new(0),
157 draining: AtomicBool::new(false),
158 window_start: RwLock::new(Instant::now()),
159 pushes_since_drain_start: AtomicU64::new(0),
160 }
161 }
162
163 /// Record current buffer length.
164 #[inline]
165 pub fn record_buffer_len(&self, len: usize) {
166 self.current_len.store(len as u64, AtomicOrdering::Relaxed);
167 }
168
169 /// Record an event ingestion: bumps the per-event counters
170 /// (events_in_window, pushes_since_drain_start) AND adds a
171 /// latency sample to the packed `(count, sum)` average.
172 ///
173 /// Pre-PERF_AUDIT §1.3 this was the only API and every
174 /// successful push called it, paying the CAS loop on
175 /// `push_latency` per event under the shard mutex. The
176 /// subsampled fast path now calls [`Self::record_event_only`]
177 /// per event and [`Self::record_latency_sample`] only once
178 /// every N pushes. This composite remains for test fixtures
179 /// and the rare caller that wants both updates together.
180 #[inline]
181 pub fn record_push(&self, latency_ns: u64) {
182 self.record_event_only();
183 self.record_latency_sample(latency_ns);
184 }
185
186 /// Bump only the per-event counters (no latency sample).
187 /// Always called by the push path so per-event statistics
188 /// (event_rate, drain-window counts) stay accurate at full
189 /// resolution; the more expensive latency CAS-loop runs only
190 /// at the subsampled cadence (PERF_AUDIT §1.3).
191 #[inline]
192 pub fn record_event_only(&self) {
193 self.events_in_window.fetch_add(1, AtomicOrdering::Relaxed);
194 // Always increment — the cost is one fetch_add and the
195 // counter only matters when the shard is draining. Cheaper
196 // than branching on `self.draining.load()` in the hot path.
197 self.pushes_since_drain_start
198 .fetch_add(1, AtomicOrdering::Relaxed);
199 }
200
201 /// Add a latency sample to the packed `(count, sum)` average.
202 /// Called once per sampling boundary (default 1-in-64 pushes)
203 /// to keep the average estimator at full statistical fidelity
204 /// without paying the CAS loop on every event (PERF_AUDIT §1.3).
205 ///
206 /// Subsampling is an unbiased estimator: `sum / count` over
207 /// random samples converges to the true mean, so the
208 /// scaling heuristic's threshold comparison stays correct;
209 /// only the variance grows. With N=64, the standard error
210 /// shrinks below 1 ns at typical event rates (>1k/sec) within
211 /// a single tick window.
212 #[inline]
213 pub fn record_latency_sample(&self, latency_ns: u64) {
214 // Atomically add 1 to count (upper 32 bits) and
215 // `latency_ns` to sum (lower 32 bits). `fetch_update`
216 // CAS-loops the load-and-store, so a concurrent
217 // `collect_and_reset` swap on the same word either sees
218 // both pre-add or both post-add — no `(sum, count)`
219 // desync. Saturating ops cap at u32::MAX inside the
220 // pack window (~4 G calls / 4 s of accumulated latency),
221 // which is far beyond any sane metrics tick.
222 let _ =
223 self.push_latency
224 .fetch_update(AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |v| {
225 let count = (v >> 32) as u32;
226 let sum = (v & 0xFFFF_FFFF) as u32;
227 let new_count = count.saturating_add(1) as u64;
228 let new_sum = sum.saturating_add(latency_ns.min(u32::MAX as u64) as u32) as u64;
229 Some((new_count << 32) | new_sum)
230 });
231 }
232
233 /// Record a batch flush.
234 #[inline]
235 pub fn record_flush(&self, latency_us: u64) {
236 // Same packed-`(count, sum)` shape as `record_push` —
237 // see that function for the desync rationale.
238 let _ = self.flush_latency.fetch_update(
239 AtomicOrdering::Relaxed,
240 AtomicOrdering::Relaxed,
241 |v| {
242 let count = (v >> 32) as u32;
243 let sum = (v & 0xFFFF_FFFF) as u32;
244 let new_count = count.saturating_add(1) as u64;
245 let new_sum = sum.saturating_add(latency_us.min(u32::MAX as u64) as u32) as u64;
246 Some((new_count << 32) | new_sum)
247 },
248 );
249 }
250
251 /// Set drain mode.
252 ///
253 /// Transitions to draining reset `pushes_since_drain_start` so
254 /// `finalize_draining` only counts pushes that arrived after the
255 /// drain began.
256 ///
257 /// A concurrent `record_push` can interleave with the store-zero
258 /// on `pushes_since_drain_start` and leave the counter at `1`
259 /// rather than `0`. That's not a correctness bug — it just defers
260 /// finalization of this shard by one metrics tick — but the
261 /// previous code did the store-zero on the counter *before*
262 /// publishing the `draining=true` flag, which made the window
263 /// slightly larger than necessary. Publishing the flag first
264 /// means any push that *observes* `draining=true` is naturally
265 /// sequenced after the reset; pushes that beat the flag publish
266 /// race the reset just like before.
267 ///
268 /// We also use `SeqCst` for the publish to give the rest of the
269 /// crate a single total order on draining transitions, which
270 /// matches the ordering on `try_enter_ingest`'s shutdown flag.
271 pub fn set_draining(&self, draining: bool) {
272 if draining {
273 // Store-zero first (so the flag publish below acts as
274 // the release-fence for both writes).
275 self.pushes_since_drain_start
276 .store(0, AtomicOrdering::SeqCst);
277 }
278 self.draining.store(draining, AtomicOrdering::SeqCst);
279 }
280
281 /// Number of pushes observed since `set_draining(true)` was
282 /// last called. Used by `finalize_draining` to detect lingering
283 /// producers that the window-reset `events_in_window` counter
284 /// can race past.
285 ///
286 /// Pre-fix used `Ordering::Relaxed`, but the writer
287 /// side (`set_draining(true)`) resets the counter under
288 /// `SeqCst`. On weakly-ordered hardware (ARM), a Relaxed
289 /// reader could observe a stale counter and `finalize_draining`
290 /// would falsely conclude the drain had flushed while
291 /// producers were still pushing. Acquire pairs with the
292 /// SeqCst release of the reset (SeqCst includes Release
293 /// semantics), making the reset happen-before this load.
294 pub fn pushes_since_drain_start(&self) -> u64 {
295 self.pushes_since_drain_start.load(AtomicOrdering::Acquire)
296 }
297
298 /// Check if draining.
299 pub fn is_draining(&self) -> bool {
300 self.draining.load(AtomicOrdering::Acquire)
301 }
302
303 /// Collect metrics and reset window counters.
304 ///
305 /// NOTE: The individual atomic swaps below are not collectively atomic with
306 /// respect to concurrent `record_push`/`record_flush` calls. This means a
307 /// push recorded between, say, the `events_in_window` swap and the
308 /// `push_count` swap could be counted in one counter but not the other for
309 /// a given window. This small inaccuracy is an accepted trade-off to
310 /// preserve the lock-free design of the hot path (`record_push` /
311 /// `record_flush`). Adding a `Mutex` here would serialize the hot path
312 /// and defeat the purpose of using atomics.
313 pub fn collect_and_reset(&self) -> ShardMetrics {
314 let current_len = self.current_len.load(AtomicOrdering::Relaxed);
315 let events = self.events_in_window.swap(0, AtomicOrdering::Relaxed);
316 // Single swap captures `(count, sum)` together; no
317 // chance of catching the sum without the matching count
318 // (or vice versa) the way two independent swaps did.
319 let push_packed = self.push_latency.swap(0, AtomicOrdering::Relaxed);
320 let push_count = push_packed >> 32;
321 let push_latency_sum = push_packed & 0xFFFF_FFFF;
322 let flush_packed = self.flush_latency.swap(0, AtomicOrdering::Relaxed);
323 let flush_count = flush_packed >> 32;
324 let flush_latency_sum = flush_packed & 0xFFFF_FFFF;
325
326 let fill_ratio = if self.capacity > 0 {
327 current_len as f64 / self.capacity as f64
328 } else {
329 0.0
330 };
331
332 let avg_push_latency = push_latency_sum.checked_div(push_count).unwrap_or(0);
333
334 let avg_flush_latency = flush_latency_sum.checked_div(flush_count).unwrap_or(0);
335
336 // Reset window
337 *self.window_start.write() = Instant::now();
338
339 let mut metrics = ShardMetrics {
340 shard_id: self.shard_id,
341 fill_ratio,
342 event_rate: events,
343 avg_push_latency_ns: avg_push_latency,
344 avg_flush_latency_us: avg_flush_latency,
345 draining: self.draining.load(AtomicOrdering::Acquire),
346 weight: 0.0,
347 last_updated: Instant::now(),
348 };
349 metrics.compute_weight();
350 metrics
351 }
352}
353
354/// Scaling decision made by the mapper.
355#[derive(Debug, Clone, PartialEq, Eq)]
356pub enum ScalingDecision {
357 /// No scaling needed.
358 None,
359 /// Scale up by adding N shards.
360 ScaleUp(u16),
361 /// Scale down by removing N shards (marks them for draining).
362 ScaleDown(u16),
363}
364
365/// Errors that can occur during scaling operations.
366#[derive(Debug, Clone)]
367pub enum ScalingError {
368 /// Invalid scaling policy.
369 InvalidPolicy(String),
370 /// Already at maximum shards.
371 AtMaxShards,
372 /// Already at minimum shards.
373 AtMinShards,
374 /// Scaling operation in cooldown.
375 InCooldown,
376 /// Shard creation failed.
377 ShardCreationFailed(String),
378}
379
380impl std::fmt::Display for ScalingError {
381 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
382 match self {
383 Self::InvalidPolicy(msg) => write!(f, "invalid scaling policy: {}", msg),
384 Self::AtMaxShards => write!(f, "already at maximum shard count"),
385 Self::AtMinShards => write!(f, "already at minimum shard count"),
386 Self::InCooldown => write!(f, "scaling operation in cooldown"),
387 Self::ShardCreationFailed(msg) => write!(f, "shard creation failed: {}", msg),
388 }
389 }
390}
391
392impl std::error::Error for ScalingError {}
393
394/// State of a shard in the mapper.
395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
396pub enum ShardState {
397 /// Shard is being provisioned: id allocated and metrics collector
398 /// in place, but `select_shard` must not route to it yet because
399 /// upstream workers (drain / batch) have not been spawned. Caller
400 /// transitions to `Active` via `activate` once the workers are
401 /// ready. Closes the race where a freshly-added shard accepted
402 /// producer pushes before any consumer existed.
403 Provisioning,
404 /// Shard is active and accepting producers.
405 Active,
406 /// Shard is draining (no new producers, waiting for empty).
407 Draining,
408 /// Shard is stopped and can be removed.
409 Stopped,
410}
411
412/// Information about a shard managed by the mapper.
413#[derive(Debug)]
414struct MappedShard {
415 /// Shard ID.
416 id: u16,
417 /// Current state.
418 state: ShardState,
419 /// Metrics collector.
420 metrics: Arc<ShardMetricsCollector>,
421 /// When this shard entered drain mode (if draining).
422 drain_started: Option<Instant>,
423 /// Last collected metrics.
424 last_metrics: ShardMetrics,
425 /// When this shard last transitioned to `Active`. Used by
426 /// `evaluate_scaling` to skip recently-activated shards from
427 /// scale decisions: their `last_metrics` is the
428 /// `ShardMetrics::new(id)` placeholder until at least one
429 /// `collect_metrics` cycle has run, and the placeholder
430 /// (`fill_ratio = 0.0, event_rate = 0`) trips the
431 /// underutilized trigger immediately — oscillating the system
432 /// (scale-up → next tick scale-down → next tick scale-up …)
433 /// when a fresh shard is added but hasn't yet absorbed any
434 /// traffic.
435 activated_at: Instant,
436}
437
438/// Callback type for shard lifecycle events.
439type ShardCallback = Box<dyn Fn(u16) + Send + Sync>;
440
441/// Pre-computed slice of shard ids that `select_shard` may return.
442///
443/// Holds the subset of currently-Active shards whose `last_metrics.weight`
444/// is within tolerance of the minimum. Empty when no active shards exist
445/// (in which case `select_shard` returns `u16::MAX`).
446///
447/// The table is rebuilt by [`ShardMapper::rebuild_selection_table_locked`]
448/// at every mutation that could change which shards are routable (state
449/// transitions, metric/weight refresh, scale-up/down). Hot-path readers
450/// (`select_shard`) snapshot it via `ArcSwap::load` — zero alloc, zero
451/// lock per call.
452struct SelectionTable {
453 /// Shard ids in priority order (within-tolerance of min weight).
454 /// `Box<[u16]>` rather than `Vec<u16>` because the slice is immutable
455 /// after construction; saves one word per snapshot.
456 candidates: Box<[u16]>,
457}
458
459/// Dynamic shard mapper that manages shard allocation and producer routing.
460///
461/// This is the core component for dynamic scaling. It:
462/// - Tracks metrics for all shards
463/// - Makes scaling decisions based on policy
464/// - Routes producers to the least-loaded shards
465/// - Manages shard lifecycle (active → draining → stopped)
466pub struct ShardMapper {
467 /// Mapped shards (RwLock for concurrent reads, rare writes).
468 shards: RwLock<Vec<MappedShard>>,
469 /// Pre-computed routable-shard snapshot. Updated whenever the shard
470 /// list or weights change; read lock-free by [`Self::select_shard`].
471 ///
472 /// Pre-fix [perf #2 in `docs/performance/net-perf-analysis.md`],
473 /// `select_shard` did this work on every event: two `Vec` allocs +
474 /// an `RwLock::read` + min-weight scan + tolerance filter. At 10M
475 /// ev/s that's 20M allocs/sec plus a parking_lot acquire each time.
476 /// The table is rebuilt at most once per metrics tick and once per
477 /// state transition.
478 selection_table: ArcSwap<SelectionTable>,
479 /// Current active shard count.
480 active_count: AtomicU16,
481 /// Scaling policy.
482 ///
483 /// This field is immutable for the lifetime of the mapper. The
484 /// previous `set_policy(&mut self, …)` API was unreachable in
485 /// practice — every production callsite holds the mapper behind
486 /// an `Arc`, and `Arc::get_mut` requires a strong count of 1,
487 /// which never holds once the worker pool clones the `Arc`. The
488 /// method has been removed; recreate the mapper (and
489 /// the bus that owns it) to change the policy.
490 policy: ScalingPolicy,
491 /// Ring buffer capacity for new shards.
492 ring_buffer_capacity: usize,
493 /// Last scaling operation timestamp.
494 ///
495 /// This RwLock is **logically** scoped to the
496 /// outer `shards.write()` lock — `scale_up`, `scale_down`,
497 /// and `scale_up_provisioning` all read this field and write
498 /// to it while holding `shards.write()`. The cooldown gate
499 /// is therefore atomic with the scale mutation: no caller
500 /// can pass the cooldown check, observe stale `last_scaling`,
501 /// and have its mutation interleave with another caller.
502 ///
503 /// **If you narrow `shards.write()`'s scope in any of those
504 /// callers, this implicit serialization breaks.** Either:
505 /// - Keep the cooldown read+write inside the outer lock, OR
506 /// - Use a `compare_exchange`-style update on a single
507 /// `AtomicI64` of nanos so the gate is atomic on its own.
508 ///
509 /// The doc-comment is here so a future refactorer doesn't
510 /// silently break the contract.
511 last_scaling: RwLock<Option<Instant>>,
512 /// Callback for shard creation (provided by ShardManager).
513 on_shard_created: RwLock<Option<ShardCallback>>,
514 /// Callback for shard removal (provided by ShardManager).
515 on_shard_removed: RwLock<Option<ShardCallback>>,
516 /// Monotonic shard-id allocator. The next `scale_up` call gets
517 /// `fetch_add(1)` from here. Distinct from `shards.iter().max() +
518 /// 1`: that approach reused ids whenever the highest-numbered
519 /// shard had been drained-and-removed, silently merging two
520 /// unrelated shard lifetimes in any external system that keys
521 /// metrics or checkpoints on shard id. Monotonic allocation
522 /// ensures every shard ever allocated has a globally unique id
523 /// for the lifetime of this mapper.
524 next_shard_id: AtomicU16,
525}
526
527impl ShardMapper {
528 /// Create a new shard mapper with the given initial shard count and policy.
529 pub fn new(
530 initial_shards: u16,
531 ring_buffer_capacity: usize,
532 policy: ScalingPolicy,
533 ) -> Result<Self, ScalingError> {
534 let mut policy = policy.normalize();
535 // Ensure max_shards can accommodate the initial shard count
536 if policy.max_shards < initial_shards {
537 policy.max_shards = initial_shards;
538 }
539 policy
540 .validate()
541 .map_err(|e| ScalingError::InvalidPolicy(e.to_string()))?;
542
543 // Initial shards: stamp `activated_at` far enough in the
544 // past that they're not subject to the warmup skip in
545 // `evaluate_scaling`. The boot-time shards have whatever
546 // baseline traffic the system serves; they shouldn't be
547 // exempted from scale decisions just because the mapper
548 // was just constructed.
549 let boot = Instant::now()
550 .checked_sub(std::time::Duration::from_secs(3600))
551 .unwrap_or_else(Instant::now);
552 let shards: Vec<MappedShard> = (0..initial_shards)
553 .map(|id| MappedShard {
554 id,
555 state: ShardState::Active,
556 metrics: Arc::new(ShardMetricsCollector::new(id, ring_buffer_capacity)),
557 drain_started: None,
558 last_metrics: ShardMetrics::new(id),
559 activated_at: boot,
560 })
561 .collect();
562
563 let mapper = Self {
564 shards: RwLock::new(shards),
565 selection_table: ArcSwap::from_pointee(SelectionTable {
566 candidates: Box::new([]),
567 }),
568 active_count: AtomicU16::new(initial_shards),
569 policy,
570 ring_buffer_capacity,
571 last_scaling: RwLock::new(None),
572 on_shard_created: RwLock::new(None),
573 on_shard_removed: RwLock::new(None),
574 // Initial shards occupy ids `[0, initial_shards)`, so the
575 // first scale-up takes `initial_shards`.
576 next_shard_id: AtomicU16::new(initial_shards),
577 };
578 // Seed the selection table with the boot shards.
579 mapper.rebuild_selection_table_locked(&mapper.shards.read());
580 Ok(mapper)
581 }
582
583 /// Rebuild the [`Self::selection_table`] from the current shards
584 /// slice. Must be called while holding the `shards` lock (read or
585 /// write) so the snapshot it captures is consistent with the
586 /// state visible to other callers.
587 ///
588 /// Filters to `Active` shards, finds the minimum weight, and
589 /// retains every active shard whose weight is within 0.1 of the
590 /// minimum. The tolerance pre-existed the perf fix (see the old
591 /// `select_shard` body) so the new table preserves the same
592 /// routing semantics.
593 ///
594 /// NaN-tolerance edge case: if every active shard has a NaN
595 /// weight the tolerance filter excludes all of them. To preserve
596 /// the pre-fix fallback (`active[0].id`), the table falls back
597 /// to *every* active id in that case.
598 fn rebuild_selection_table_locked(&self, shards: &[MappedShard]) {
599 let mut active_ids = Vec::with_capacity(shards.len());
600 for s in shards.iter() {
601 if s.state == ShardState::Active {
602 active_ids.push(s.id);
603 }
604 }
605 if active_ids.is_empty() {
606 self.selection_table.store(Arc::new(SelectionTable {
607 candidates: Box::new([]),
608 }));
609 return;
610 }
611 let min_weight = shards
612 .iter()
613 .filter(|s| s.state == ShardState::Active)
614 .map(|s| s.last_metrics.weight)
615 .fold(f64::MAX, f64::min);
616 let mut candidates: Vec<u16> = shards
617 .iter()
618 .filter(|s| {
619 s.state == ShardState::Active && (s.last_metrics.weight - min_weight).abs() < 0.1
620 })
621 .map(|s| s.id)
622 .collect();
623 if candidates.is_empty() {
624 // All-NaN fallback. Preserves the pre-fix `active[0].id`
625 // safety net; the random pick across all active shards
626 // is a strict superset of "first active."
627 candidates = active_ids;
628 }
629 self.selection_table.store(Arc::new(SelectionTable {
630 candidates: candidates.into_boxed_slice(),
631 }));
632 }
633
634 /// Set callback for shard creation.
635 pub fn set_on_shard_created<F>(&self, callback: F)
636 where
637 F: Fn(u16) + Send + Sync + 'static,
638 {
639 *self.on_shard_created.write() = Some(Box::new(callback));
640 }
641
642 /// Set callback for shard removal.
643 pub fn set_on_shard_removed<F>(&self, callback: F)
644 where
645 F: Fn(u16) + Send + Sync + 'static,
646 {
647 *self.on_shard_removed.write() = Some(Box::new(callback));
648 }
649
650 /// Get the metrics collector for a shard.
651 pub fn metrics_collector(&self, shard_id: u16) -> Option<Arc<ShardMetricsCollector>> {
652 let shards = self.shards.read();
653 shards
654 .iter()
655 .find(|s| s.id == shard_id)
656 .map(|s| s.metrics.clone())
657 }
658
659 /// Get current active shard count.
660 pub fn active_shard_count(&self) -> u16 {
661 self.active_count.load(AtomicOrdering::Acquire)
662 }
663
664 /// Get total shard count (including draining).
665 pub fn total_shard_count(&self) -> u16 {
666 self.shards.read().len() as u16
667 }
668
669 /// Select the best shard for a new event/producer.
670 ///
671 /// Pre-fix this acquired `shards.read()` and allocated two `Vec`s
672 /// per call (active filter, candidate filter) — at 10M ev/s that
673 /// was 20M allocs/sec plus a parking_lot acquire per event.
674 /// Post-fix the routable subset is pre-computed in the private
675 /// `selection_table` field and updated only at state
676 /// transitions / metric refreshes; the hot path is a single
677 /// `ArcSwap::load` + Lemire-bias-free integer mapping.
678 ///
679 /// Semantics preserved:
680 /// - Only considers active (non-draining) shards
681 /// - Prefers shards with lower weight (less loaded)
682 /// - Falls back to a random pick across all active shards if the
683 /// tolerance filter excludes everything (NaN-weight edge case)
684 /// - Returns `u16::MAX` when no active shard exists, so callers
685 /// that route via the routing table get a definite miss
686 #[inline]
687 pub fn select_shard(&self, event_hash: u64) -> u16 {
688 let table = self.selection_table.load();
689 if table.candidates.is_empty() {
690 // No active shards. Pre-fix returned `u16::MAX` from the
691 // same arm — callers that look up the id in the routing
692 // table get a definite miss, which the manager already
693 // accounts for.
694 return u16::MAX;
695 }
696 // Lemire's bias-free index mapping for any `len` that fits
697 // in u64. Pre-fix used `(event_hash as usize) %
698 // candidates.len()`, which biases low-bucket indices when
699 // `candidates.len()` is not a power of two
700 // (https://lemire.me/blog/2016/06/30/fast-random-shuffling/).
701 let idx = ((event_hash as u128 * table.candidates.len() as u128) >> 64) as usize;
702 table.candidates[idx]
703 }
704
705 /// Collect metrics from all shards and update weights.
706 pub fn collect_metrics(&self) -> Vec<ShardMetrics> {
707 let mut shards = self.shards.write();
708 let result: Vec<ShardMetrics> = shards
709 .iter_mut()
710 .map(|s| {
711 s.last_metrics = s.metrics.collect_and_reset();
712 s.last_metrics.clone()
713 })
714 .collect();
715 // Weights just changed — refresh the selection table so
716 // `select_shard` observes the new min-weight candidate set
717 // without re-scanning on every event.
718 self.rebuild_selection_table_locked(&shards);
719 result
720 }
721
722 /// Evaluate scaling based on current metrics.
723 ///
724 /// Returns a scaling decision without executing it.
725 pub fn evaluate_scaling(&self) -> ScalingDecision {
726 if !self.policy.auto_scale {
727 return ScalingDecision::None;
728 }
729
730 // Check cooldown
731 if let Some(last) = *self.last_scaling.read() {
732 if last.elapsed() < self.policy.cooldown {
733 return ScalingDecision::None;
734 }
735 }
736
737 let shards = self.shards.read();
738 let active_count = self.active_count.load(AtomicOrdering::Acquire);
739
740 // Check for scale-up triggers
741 let mut overloaded_count = 0;
742 let mut underutilized_count = 0;
743
744 // Warmup window for freshly-activated shards. A just-
745 // activated shard's `last_metrics` is the
746 // `ShardMetrics::new(id)` placeholder
747 // (`fill_ratio = 0.0, event_rate = 0`) until at least one
748 // `collect_metrics` cycle has run. The placeholder
749 // immediately matches the underutilized trigger
750 // (`fill_ratio < underutilized_threshold && event_rate
751 // == 0`), so a fresh shard added by scale-up would
752 // immediately count as underutilized on the next
753 // `evaluate_scaling` and trigger scale-down — oscillating
754 // the system. Reuse `policy.cooldown` as the warmup
755 // window: it's already the minimum gap between scaling
756 // actions, and a shard collected at least once within
757 // that window has accumulated real metrics.
758 let now = Instant::now();
759 let warmup = self.policy.cooldown;
760
761 for shard in shards.iter() {
762 if shard.state != ShardState::Active {
763 continue;
764 }
765
766 // Skip the placeholder-metrics window for freshly-
767 // activated shards. They count toward `active_count`
768 // (so the budget math stays consistent) but don't
769 // tip the overload/underutilized tallies.
770 if now.duration_since(shard.activated_at) < warmup {
771 continue;
772 }
773
774 let m = &shard.last_metrics;
775
776 // Scale-up triggers
777 if m.fill_ratio > self.policy.fill_ratio_threshold
778 || m.avg_push_latency_ns > self.policy.push_latency_threshold_ns
779 || m.avg_flush_latency_us > self.policy.flush_latency_threshold_us
780 {
781 overloaded_count += 1;
782 }
783
784 // Scale-down triggers
785 if m.fill_ratio < self.policy.underutilized_threshold && m.event_rate == 0 {
786 underutilized_count += 1;
787 }
788 }
789
790 // Scale up if more than half of shards are overloaded
791 if overloaded_count > active_count / 2 && active_count < self.policy.max_shards {
792 // Add shards proportional to overload
793 let shards_to_add = (overloaded_count / 2)
794 .max(1)
795 .min(self.policy.max_shards - active_count);
796 return ScalingDecision::ScaleUp(shards_to_add);
797 }
798
799 // Scale down if more than half of shards are underutilized
800 // and we're above minimum
801 if underutilized_count > active_count / 2 && active_count > self.policy.min_shards {
802 let shards_to_remove = (underutilized_count / 2)
803 .max(1)
804 .min(active_count - self.policy.min_shards);
805 return ScalingDecision::ScaleDown(shards_to_remove);
806 }
807
808 ScalingDecision::None
809 }
810
811 /// Validate cooldown + max-shards budget for an `add count`
812 /// scale-up request. Cheap pre-check that doesn't take the
813 /// write lock — callers re-check under the lock.
814 fn check_scale_up_budget(&self, count: u16) -> Result<(), ScalingError> {
815 let current = self.active_count.load(AtomicOrdering::Acquire);
816 let would_be = current
817 .checked_add(count)
818 .ok_or(ScalingError::AtMaxShards)?;
819 if would_be > self.policy.max_shards {
820 return Err(ScalingError::AtMaxShards);
821 }
822 let last = self.last_scaling.read();
823 if let Some(ts) = *last {
824 if ts.elapsed() < self.policy.cooldown {
825 return Err(ScalingError::InCooldown);
826 }
827 }
828 Ok(())
829 }
830
831 /// Allocate `count` shard ids and push their `MappedShard`
832 /// records into `shards` with the supplied `state`. The caller
833 /// already holds `self.shards.write()` and is responsible for
834 /// dropping it before notifying callbacks. Returns the allocated
835 /// ids in order.
836 ///
837 /// Performs the budget + cooldown re-check under the write lock,
838 /// the next_shard_id allocation, and the per-shard push. Does NOT
839 /// touch `active_count` — `scale_up` bumps it for `Active` shards;
840 /// `Provisioning` shards bump it later when `activate` fires.
841 fn allocate_shards_inner(
842 &self,
843 count: u16,
844 state: ShardState,
845 shards: &mut Vec<MappedShard>,
846 ) -> Result<Vec<u16>, ScalingError> {
847 self.allocate_shards_inner_with_policy(count, state, shards, false)
848 }
849
850 fn allocate_shards_inner_with_policy(
851 &self,
852 count: u16,
853 state: ShardState,
854 shards: &mut Vec<MappedShard>,
855 force: bool,
856 ) -> Result<Vec<u16>, ScalingError> {
857 // Re-check budget under the write lock — two concurrent
858 // scale-up callers could both pass the read-locked early
859 // check, both serialize through `shards.write()`, and both
860 // succeed without this re-check.
861 if force {
862 // Budget only — skip cooldown for operator-initiated paths.
863 let current = self.active_count.load(AtomicOrdering::Acquire);
864 let would_be = current
865 .checked_add(count)
866 .ok_or(ScalingError::AtMaxShards)?;
867 if would_be > self.policy.max_shards {
868 return Err(ScalingError::AtMaxShards);
869 }
870 } else {
871 self.check_scale_up_budget(count)?;
872 }
873
874 let first_id = self.next_shard_id.load(AtomicOrdering::Relaxed);
875 let last_needed = first_id
876 .checked_add(count.saturating_sub(1))
877 .ok_or(ScalingError::AtMaxShards)?;
878 // Reserve `u16::MAX` as a sentinel so the post-allocation
879 // store cannot wrap.
880 if last_needed == u16::MAX {
881 return Err(ScalingError::AtMaxShards);
882 }
883 // `first_id + count == last_needed + 1`. We already
884 // refused `last_needed == u16::MAX` above, so the sum is
885 // provably <= u16::MAX. Use `checked_add` anyway as a
886 // belt-and-suspenders guard: a future change that
887 // weakens the sentinel check would otherwise reach an
888 // unchecked u16 wrap here, silently rolling
889 // `next_shard_id` back to 0 and then re-issuing already-
890 // allocated ids.
891 let next_id_after = first_id
892 .checked_add(count)
893 .ok_or(ScalingError::AtMaxShards)?;
894 self.next_shard_id
895 .store(next_id_after, AtomicOrdering::Relaxed);
896
897 let mut new_ids = Vec::with_capacity(count as usize);
898 let now = Instant::now();
899 for i in 0..count {
900 let new_id = first_id + i;
901 shards.push(MappedShard {
902 id: new_id,
903 state,
904 metrics: Arc::new(ShardMetricsCollector::new(
905 new_id,
906 self.ring_buffer_capacity,
907 )),
908 drain_started: None,
909 last_metrics: ShardMetrics::new(new_id),
910 // Stamp the activation moment so `evaluate_scaling`
911 // can skip this shard until at least one collect
912 // cycle has run. Prevents the placeholder
913 // (`fill_ratio = 0, event_rate = 0`) from
914 // immediately tripping the underutilized trigger
915 // and oscillating the system.
916 activated_at: now,
917 });
918 new_ids.push(new_id);
919 }
920 Ok(new_ids)
921 }
922
923 /// Execute a scale-up operation.
924 ///
925 /// Creates new shards in the `Active` state and makes them
926 /// immediately available for routing. Use [`scale_up_provisioning`]
927 /// if upstream workers (drain / batch) need to be wired up before
928 /// the shard becomes selectable — otherwise producer pushes can
929 /// race ahead of consumer creation.
930 ///
931 /// [`scale_up_provisioning`]: Self::scale_up_provisioning
932 pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
933 // Short-circuit `count == 0` so a no-op call doesn't bump the
934 // cooldown timestamp or trip the `u16::MAX` sentinel check
935 // (which previously fired spuriously when
936 // `first_id == u16::MAX` even though zero ids were being
937 // allocated).
938 if count == 0 {
939 return Ok(Vec::new());
940 }
941
942 self.check_scale_up_budget(count)?;
943
944 let mut shards = self.shards.write();
945 let new_ids = self.allocate_shards_inner(count, ShardState::Active, &mut shards)?;
946
947 // Update counts. `fetch_add` cannot wrap here — the
948 // `check_scale_up_budget` gate above ensures `current + count <=
949 // max_shards <= u16::MAX` — but keep the ordering explicit
950 // so the next contender's cooldown re-check sees the fresh
951 // timestamp.
952 self.active_count.fetch_add(count, AtomicOrdering::Release);
953 *self.last_scaling.write() = Some(Instant::now());
954
955 // New active shards are routable immediately. Refresh the
956 // selection table before dropping the lock so any concurrent
957 // `select_shard` observes the new ids in its very next call.
958 self.rebuild_selection_table_locked(&shards);
959
960 // Drop the write lock before notifying callbacks — they
961 // are user-supplied and may take arbitrary time.
962 drop(shards);
963
964 if let Some(callback) = self.on_shard_created.read().as_ref() {
965 for &id in &new_ids {
966 callback(id);
967 }
968 }
969
970 Ok(new_ids)
971 }
972
973 /// Like [`scale_up`], but the new shards are created in the
974 /// `Provisioning` state. They receive an id and a metrics
975 /// collector, but `select_shard` will not route to them and they
976 /// are excluded from `active_shard_count` / `evaluate_scaling`
977 /// until the caller transitions each shard with [`activate`].
978 ///
979 /// Use this when upstream consumer infrastructure (drain/batch
980 /// workers, mpsc channels, etc.) must be wired up *before* the
981 /// shard becomes selectable. Without this gating, producers can
982 /// observe the shard via `select_shard`, push into its ring
983 /// buffer, and never have those events drained.
984 ///
985 /// Returns the allocated ids in order. Cooldown / `max_shards`
986 /// gating matches `scale_up` so that staged allocation cannot be
987 /// used to bypass the policy.
988 ///
989 /// [`scale_up`]: Self::scale_up
990 /// [`activate`]: Self::activate
991 pub fn scale_up_provisioning(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
992 if count == 0 {
993 return Ok(Vec::new());
994 }
995
996 self.check_scale_up_budget(count)?;
997
998 let mut shards = self.shards.write();
999 let new_ids = self.allocate_shards_inner(count, ShardState::Provisioning, &mut shards)?;
1000
1001 // Bump cooldown but NOT active_count — these shards are
1002 // not active yet. `activate` bumps active_count when each
1003 // becomes selectable.
1004 *self.last_scaling.write() = Some(Instant::now());
1005 drop(shards);
1006 // Intentionally do NOT fire `on_shard_created` here — the
1007 // callback signals "shard is live"; provisioning shards are
1008 // not. `activate` fires it instead.
1009 Ok(new_ids)
1010 }
1011
1012 /// Allocate `count` Provisioning shards, bypassing the cooldown gate.
1013 ///
1014 /// Used by operator-initiated `manual_scale_up` paths. The
1015 /// cooldown exists to prevent the auto-scaling monitor from
1016 /// scaling-up too aggressively in response to transient
1017 /// load spikes; a manual call from an operator is a
1018 /// deliberate request that should not be rate-limited by
1019 /// the auto-scaling cadence. The budget check (against
1020 /// `max_shards`) still applies.
1021 ///
1022 /// Pre-fix `manual_scale_up(N)` looped `add_shard()` N
1023 /// times, each call invoking `scale_up_provisioning(1)`
1024 /// which bumped `last_scaling`. The second call then
1025 /// immediately failed with `InCooldown` (default 30s
1026 /// cooldown), leaving the first shard half-added and
1027 /// returning an error to the operator with no rollback.
1028 pub fn scale_up_provisioning_force(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1029 if count == 0 {
1030 return Ok(Vec::new());
1031 }
1032
1033 let mut shards = self.shards.write();
1034 let new_ids = self.allocate_shards_inner_with_policy(
1035 count,
1036 ShardState::Provisioning,
1037 &mut shards,
1038 true, // skip cooldown
1039 )?;
1040
1041 // Still bump the cooldown timestamp so the next
1042 // *auto-scaling* tick respects the cooldown floor.
1043 *self.last_scaling.write() = Some(Instant::now());
1044 drop(shards);
1045 Ok(new_ids)
1046 }
1047
1048 /// Transition a `Provisioning` shard to `Active`.
1049 ///
1050 /// Returns `Ok(true)` if a state transition actually occurred
1051 /// (Provisioning → Active) and `Ok(false)` if the shard was
1052 /// already `Active` — the latter is the idempotent path.
1053 /// Returns `InvalidPolicy` for unknown or `Draining`/`Stopped`
1054 /// shards — those states require a different lifecycle path.
1055 /// Bumps `active_count` and notifies the `on_shard_created`
1056 /// callback exactly once per real transition.
1057 ///
1058 /// Pre-fix this returned `Result<(), ScalingError>`,
1059 /// so callers (notably `ShardManager::activate_shard`) could
1060 /// not tell whether they had bumped the live count or not and
1061 /// double-incremented their own `num_shards` on every
1062 /// idempotent call.
1063 pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError> {
1064 let mut shards = self.shards.write();
1065 let shard = shards
1066 .iter_mut()
1067 .find(|s| s.id == shard_id)
1068 .ok_or_else(|| {
1069 ScalingError::InvalidPolicy(format!("activate: shard {} not found", shard_id))
1070 })?;
1071 match shard.state {
1072 ShardState::Active => return Ok(false),
1073 ShardState::Provisioning => {
1074 // Gate activation on
1075 // `active_count < max_shards`. The budget gate at
1076 // `check_scale_up_budget` only counts ALREADY-active
1077 // shards — multiple `scale_up_provisioning(1)` calls
1078 // can each pass (they don't bump `active_count`),
1079 // and an unconditional `fetch_add(1)` here would
1080 // push past `max_shards`. Subsequent
1081 // `evaluate_scaling`'s `max_shards - active_count`
1082 // arithmetic would then underflow u16 (debug-build
1083 // panic; release wraps to ~65530). Activate-time
1084 // re-checks the budget with the lock held; the
1085 // Provisioning shard stays in Provisioning state and
1086 // the caller (e.g. `add_shard_internal`'s rollback)
1087 // is responsible for tearing it down.
1088 //
1089 // The load + state mutation + `fetch_add` all happen
1090 // while we hold the `shards.write()` guard so a
1091 // concurrent `activate(distinct_id)` reads the
1092 // already-bumped `active_count` and hits
1093 // `AtMaxShards` instead of squeezing through the
1094 // window between our state update and our
1095 // `fetch_add`. Pre-fix the `fetch_add` ran after
1096 // `drop(shards)` — two activates could each see a
1097 // stale count below `max_shards` and both bump,
1098 // transiently overshooting the budget.
1099 let current = self.active_count.load(AtomicOrdering::Acquire);
1100 if current >= self.policy.max_shards {
1101 return Err(ScalingError::AtMaxShards);
1102 }
1103 shard.state = ShardState::Active;
1104 // Re-stamp `activated_at` so `evaluate_scaling`
1105 // gives this freshly-activated shard a warmup
1106 // window before counting it in the
1107 // overloaded/underutilized tallies. The
1108 // Provisioning → Active transition is the moment
1109 // traffic starts flowing; the shard's
1110 // `last_metrics` is still the
1111 // `ShardMetrics::new(id)` placeholder until the
1112 // next `collect_metrics` cycle.
1113 shard.activated_at = Instant::now();
1114 // Publish the increment while still holding the
1115 // write lock. `Release` here pairs with the
1116 // `Acquire` load above so any later activator that
1117 // takes the same lock observes our bump.
1118 self.active_count.fetch_add(1, AtomicOrdering::Release);
1119 }
1120 ShardState::Draining | ShardState::Stopped => {
1121 return Err(ScalingError::InvalidPolicy(format!(
1122 "activate: shard {} is in state {:?}, cannot activate",
1123 shard_id, shard.state
1124 )));
1125 }
1126 }
1127 // The shard just became routable. Refresh the selection
1128 // table before dropping the lock so the next `select_shard`
1129 // observes it.
1130 self.rebuild_selection_table_locked(&shards);
1131 drop(shards);
1132
1133 if let Some(callback) = self.on_shard_created.read().as_ref() {
1134 callback(shard_id);
1135 }
1136 Ok(true)
1137 }
1138
1139 /// Drain a specific shard by id, transitioning it from `Active`
1140 /// to `Draining`.
1141 ///
1142 /// Companion to `ShardManager::drain_shard`. The previous
1143 /// implementation only flipped the metrics collector's `draining`
1144 /// atomic; this version atomically updates `MappedShard.state`
1145 /// (so `select_shard` stops routing to the shard) and decrements
1146 /// `active_count` (so `evaluate_scaling`'s budget math stays
1147 /// consistent with `scale_down`). Returns an error if the shard
1148 /// is not in `Active` state, or if doing so would push the active
1149 /// count below `min_shards`.
1150 pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError> {
1151 let mut shards = self.shards.write();
1152 let current_active = self.active_count.load(AtomicOrdering::Acquire);
1153 if current_active <= self.policy.min_shards {
1154 return Err(ScalingError::AtMinShards);
1155 }
1156 let shard = shards
1157 .iter_mut()
1158 .find(|s| s.id == shard_id)
1159 .ok_or_else(|| {
1160 ScalingError::InvalidPolicy(format!("drain_specific: shard {} not found", shard_id))
1161 })?;
1162 match shard.state {
1163 ShardState::Active => {
1164 shard.state = ShardState::Draining;
1165 shard.drain_started = Some(Instant::now());
1166 shard.metrics.set_draining(true);
1167 }
1168 ShardState::Draining => return Ok(()),
1169 ShardState::Provisioning | ShardState::Stopped => {
1170 return Err(ScalingError::InvalidPolicy(format!(
1171 "drain_specific: shard {} is in state {:?}, cannot drain",
1172 shard_id, shard.state
1173 )));
1174 }
1175 }
1176 // The drained shard is no longer routable. Refresh the
1177 // selection table before dropping the lock.
1178 self.rebuild_selection_table_locked(&shards);
1179 drop(shards);
1180 self.active_count.fetch_sub(1, AtomicOrdering::Release);
1181 // Bump `last_scaling` so a subsequent `scale_up` is gated
1182 // by the cooldown floor. Pre-fix `drain_specific` removed
1183 // a shard from Active without touching `last_scaling`, so
1184 // the sequence `drain_specific(id) → scale_up(N)`
1185 // bypassed the cooldown — `scale_down` writes
1186 // `last_scaling` precisely for this reason. From the
1187 // budget-math perspective `drain_specific` IS a scale-
1188 // down (it decrements `active_count` and trips the
1189 // `min_shards` floor), so it should also gate
1190 // re-expansion the same way.
1191 *self.last_scaling.write() = Some(Instant::now());
1192 Ok(())
1193 }
1194
1195 /// Start draining shards for scale-down.
1196 ///
1197 /// Marks shards as draining so they stop receiving new events.
1198 /// Shards will be removed once they're empty.
1199 pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1200 // Early checks (may race, but avoid acquiring the write lock)
1201 let current = self.active_count.load(AtomicOrdering::Acquire);
1202 if current <= self.policy.min_shards {
1203 return Err(ScalingError::AtMinShards);
1204 }
1205
1206 let to_drain = count.min(current - self.policy.min_shards);
1207 if to_drain == 0 {
1208 return Err(ScalingError::AtMinShards);
1209 }
1210 {
1211 let last = self.last_scaling.read();
1212 if let Some(ts) = *last {
1213 if ts.elapsed() < self.policy.cooldown {
1214 return Err(ScalingError::InCooldown);
1215 }
1216 }
1217 }
1218
1219 let mut shards = self.shards.write();
1220
1221 // Re-check under the lock to prevent race conditions (double-check pattern)
1222 let current = self.active_count.load(AtomicOrdering::Acquire);
1223 if current <= self.policy.min_shards {
1224 return Err(ScalingError::AtMinShards);
1225 }
1226 let to_drain = count.min(current - self.policy.min_shards);
1227 if to_drain == 0 {
1228 return Err(ScalingError::AtMinShards);
1229 }
1230 // Re-check cooldown under the same write lock that gates
1231 // mutation — see the matching note in `scale_up`.
1232 {
1233 let last = self.last_scaling.read();
1234 if let Some(ts) = *last {
1235 if ts.elapsed() < self.policy.cooldown {
1236 return Err(ScalingError::InCooldown);
1237 }
1238 }
1239 }
1240
1241 let mut drained_ids = Vec::with_capacity(to_drain as usize);
1242
1243 // Find shards with lowest weight (least utilized) to drain
1244 let mut active_indices: Vec<_> = shards
1245 .iter()
1246 .enumerate()
1247 .filter(|(_, s)| s.state == ShardState::Active)
1248 .map(|(i, s)| (i, s.last_metrics.weight))
1249 .collect();
1250
1251 // Sort by weight (ascending - drain least utilized first)
1252 active_indices.sort_by(|a, b| a.1.total_cmp(&b.1));
1253
1254 // Mark shards for draining
1255 for (idx, _) in active_indices.into_iter().take(to_drain as usize) {
1256 shards[idx].state = ShardState::Draining;
1257 shards[idx].drain_started = Some(Instant::now());
1258 shards[idx].metrics.set_draining(true);
1259 drained_ids.push(shards[idx].id);
1260 }
1261
1262 // Update count
1263 self.active_count
1264 .fetch_sub(to_drain, AtomicOrdering::Release);
1265 *self.last_scaling.write() = Some(Instant::now());
1266
1267 // Drained shards are no longer routable. Refresh the
1268 // selection table while still holding the lock.
1269 self.rebuild_selection_table_locked(&shards);
1270
1271 Ok(drained_ids)
1272 }
1273
1274 /// Check draining shards and finalize those that are empty.
1275 ///
1276 /// Returns IDs of shards that were stopped.
1277 ///
1278 /// This predicate looks ONLY at the ring buffer
1279 /// (`current_len` + `pushes_since_drain_start`); it does NOT
1280 /// probe the per-shard mpsc channel or the BatchWorker's
1281 /// `current_batch`. A shard that the predicate flags as empty
1282 /// can still have events queued in those two places. The
1283 /// correctness gate is therefore `bus::remove_shard_internal`,
1284 /// which awaits the BatchWorker's `JoinHandle` before
1285 /// constructing the stranded-flush batch — see that function's
1286 /// step 3 for the rationale. Tightening this predicate is a
1287 /// defense-in-depth follow-up; a stricter ring-buffer-empty
1288 /// signal here would only narrow an already-closed window.
1289 pub fn finalize_draining(&self) -> Vec<u16> {
1290 let mut shards = self.shards.write();
1291 let mut stopped = Vec::new();
1292
1293 for shard in shards.iter_mut() {
1294 if shard.state == ShardState::Draining {
1295 // Check if shard is empty by reading current_len directly,
1296 // avoiding collect_and_reset() which destructively zeros all counters.
1297 let current_len = shard.metrics.current_len.load(AtomicOrdering::Relaxed);
1298 let fill_ratio = if shard.metrics.capacity > 0 {
1299 current_len as f64 / shard.metrics.capacity as f64
1300 } else {
1301 0.0
1302 };
1303 // Previously read `events_in_window` here, which
1304 // `collect_and_reset` zeros every metrics tick. A
1305 // producer push that landed in the window between two
1306 // ticks could be silently zeroed out, so a draining
1307 // shard whose buffer transiently emptied was finalized
1308 // with a producer still attached.
1309 // `pushes_since_drain_start` is a separate counter
1310 // that is only reset by `set_draining(true)`, so any
1311 // push observed since the drain began is sticky —
1312 // exactly the signal we want.
1313 // Acquire pairs with `set_draining`'s SeqCst reset so
1314 // the load can't observe a stale value from before the
1315 // drain began. A Relaxed load here let weakly-ordered
1316 // hardware see the pre-reset count and finalize while
1317 // a producer was still pushing.
1318 let pushes_after_drain = shard.metrics.pushes_since_drain_start();
1319 if fill_ratio == 0.0 && pushes_after_drain == 0 {
1320 // Check if we've waited long enough
1321 if let Some(drain_start) = shard.drain_started {
1322 if drain_start.elapsed() > Duration::from_millis(100) {
1323 shard.state = ShardState::Stopped;
1324 stopped.push(shard.id);
1325 }
1326 }
1327 }
1328 }
1329 }
1330
1331 // Draining → Stopped transitions: no impact on the routable
1332 // set (Draining was already excluded), but rebuild for
1333 // consistency in case `last_metrics.weight` was stale on a
1334 // shard that just stopped. Cheap; bounded by shard count.
1335 if !stopped.is_empty() {
1336 self.rebuild_selection_table_locked(&shards);
1337 }
1338
1339 // Drop the write lock BEFORE notifying. The callback is
1340 // user-supplied and may re-enter the mapper (`shard_state`,
1341 // `select_shard`, `metrics_collector`, …), each of which
1342 // acquires `shards.read()`. `parking_lot::RwLock` is not
1343 // recursive, so a re-entrant read attempt while we hold a
1344 // write would deadlock. `scale_up`'s callback path already
1345 // releases its lock before calling out — mirror that
1346 // here.
1347 drop(shards);
1348
1349 if !stopped.is_empty() {
1350 if let Some(callback) = self.on_shard_removed.read().as_ref() {
1351 for &id in &stopped {
1352 callback(id);
1353 }
1354 }
1355 }
1356
1357 stopped
1358 }
1359
1360 /// Remove a specific shard from the mapper if it is in the
1361 /// `Stopped` state. Used by `ShardManager::remove_shard` so a
1362 /// per-shard cleanup doesn't disturb sibling `Stopped`
1363 /// entries — which a sequential `manual_scale_down` loop
1364 /// still needs to look up state for. Returns `true` if the
1365 /// shard existed and was Stopped (and was removed).
1366 pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool {
1367 let mut shards = self.shards.write();
1368 let before = shards.len();
1369 shards.retain(|s| !(s.id == shard_id && s.state == ShardState::Stopped));
1370 let removed = shards.len() < before;
1371 if removed {
1372 // Stopped shards weren't routable, but the underlying
1373 // slice changed length / order — rebuild so the table
1374 // doesn't dangle a stale id.
1375 self.rebuild_selection_table_locked(&shards);
1376 }
1377 removed
1378 }
1379
1380 /// Remove stopped shards from the mapper.
1381 pub fn remove_stopped_shards(&self) -> Vec<u16> {
1382 let mut shards = self.shards.write();
1383 let before = shards.len();
1384 let removed: Vec<u16> = shards
1385 .iter()
1386 .filter(|s| s.state == ShardState::Stopped)
1387 .map(|s| s.id)
1388 .collect();
1389
1390 shards.retain(|s| s.state != ShardState::Stopped);
1391
1392 if shards.len() < before {
1393 tracing::info!(
1394 removed = removed.len(),
1395 remaining = shards.len(),
1396 "Removed stopped shards"
1397 );
1398 // Same reason as `remove_specific_stopped_shard`.
1399 self.rebuild_selection_table_locked(&shards);
1400 }
1401
1402 removed
1403 }
1404
1405 /// Get the state of a specific shard.
1406 pub fn shard_state(&self, shard_id: u16) -> Option<ShardState> {
1407 self.shards
1408 .read()
1409 .iter()
1410 .find(|s| s.id == shard_id)
1411 .map(|s| s.state)
1412 }
1413
1414 /// Get all active shard IDs.
1415 pub fn active_shard_ids(&self) -> Vec<u16> {
1416 self.shards
1417 .read()
1418 .iter()
1419 .filter(|s| s.state == ShardState::Active)
1420 .map(|s| s.id)
1421 .collect()
1422 }
1423
1424 /// Get all shard IDs (including draining).
1425 pub fn all_shard_ids(&self) -> Vec<u16> {
1426 self.shards
1427 .read()
1428 .iter()
1429 .filter(|s| s.state != ShardState::Stopped)
1430 .map(|s| s.id)
1431 .collect()
1432 }
1433
1434 /// Get the scaling policy.
1435 pub fn policy(&self) -> &ScalingPolicy {
1436 &self.policy
1437 }
1438 // `set_policy` previously took `&mut self` and was unreachable
1439 // through the `Arc<ShardMapper>` that the production code holds
1440 // (`Arc::get_mut` fails once the worker pool has cloned the Arc).
1441 // The method has been removed — recreate the mapper / bus to
1442 // change the policy.
1443}
1444
1445#[cfg(test)]
1446mod tests {
1447 #![allow(
1448 clippy::disallowed_methods,
1449 reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1450 )]
1451 use super::*;
1452
1453 #[test]
1454 fn test_shard_mapper_creation() {
1455 let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1456 assert_eq!(mapper.active_shard_count(), 4);
1457 assert_eq!(mapper.total_shard_count(), 4);
1458 }
1459
1460 #[test]
1461 fn test_select_shard_distributes() {
1462 let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1463
1464 // Different hashes should potentially select different shards
1465 let mut selected = std::collections::HashSet::new();
1466 for i in 0..100u64 {
1467 let shard = mapper.select_shard(i * 12345);
1468 selected.insert(shard);
1469 }
1470
1471 // With 4 shards, we should hit multiple
1472 assert!(!selected.is_empty());
1473 }
1474
1475 /// Pin for perf #2: the selection table reflects the routable
1476 /// subset *exactly*, and `select_shard` reads it lock-free.
1477 /// A regression that, say, forgot to refresh after `drain_specific`
1478 /// would let `select_shard` continue routing to the draining
1479 /// shard — observable here as the drained id appearing in
1480 /// `select_shard`'s output.
1481 #[test]
1482 fn selection_table_reflects_active_subset_after_state_transitions() {
1483 // ScalingPolicy::validate rejects `min_shards == 0`; use 1
1484 // and verify we can drain down to the floor via the
1485 // official `drain_specific` API.
1486 let policy = ScalingPolicy {
1487 min_shards: 1,
1488 max_shards: 8,
1489 cooldown: Duration::from_nanos(1),
1490 ..Default::default()
1491 };
1492 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1493
1494 // Initial: all 3 shards routable.
1495 let mut seen = std::collections::HashSet::new();
1496 for i in 0..500u64 {
1497 seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1498 }
1499 assert_eq!(seen, std::collections::HashSet::from([0, 1, 2]));
1500
1501 // Drain shard 1 → only 0 and 2 should be routable.
1502 mapper.drain_specific(1).unwrap();
1503 let mut seen = std::collections::HashSet::new();
1504 for i in 0..500u64 {
1505 seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1506 }
1507 assert_eq!(
1508 seen,
1509 std::collections::HashSet::from([0, 2]),
1510 "drained shard must vanish from select_shard output; \
1511 a regression here would let select_shard route to a \
1512 Draining shard (blocking finalization)",
1513 );
1514
1515 // Drain shard 2 → only 0 left (at min_shards floor).
1516 mapper.drain_specific(2).unwrap();
1517 let mut seen = std::collections::HashSet::new();
1518 for i in 0..500u64 {
1519 seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1520 }
1521 assert_eq!(seen, std::collections::HashSet::from([0]));
1522 }
1523
1524 /// Pin: `select_shard` does NOT acquire `self.shards` (read or
1525 /// write). The whole perf #2 fix is that the hot path is
1526 /// `ArcSwap::load` only — no parking_lot. We pin this by
1527 /// holding a write lock on `shards` and asserting
1528 /// `select_shard` still returns immediately.
1529 #[test]
1530 fn select_shard_does_not_acquire_shards_lock() {
1531 let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1532
1533 // Hold the write lock. If `select_shard` tried to acquire
1534 // any flavor of `shards.{read,write}()` it would block
1535 // forever — parking_lot is not recursive.
1536 let _guard = mapper.shards.write();
1537 let shard = mapper.select_shard(0xDEAD_BEEF);
1538 assert!(shard < 2, "select_shard must return one of [0, 1]");
1539 }
1540
1541 /// `select_shard`'s candidate-index computation must
1542 /// be unbiased across the u64 hash space. Pre-fix used
1543 /// `hash as usize % candidates.len()`, which over-weights low
1544 /// indices when `candidates.len()` is not a power of two.
1545 /// With u64 hashes the bias is small but non-zero and
1546 /// sustains a hot-shard skew over time. Lemire's
1547 /// `(hash * len) >> 64` is unbiased.
1548 ///
1549 /// We test the unbiased property by sampling a uniform
1550 /// distribution of u64 hashes over 3 candidate shards and
1551 /// asserting each bucket gets close to 1/3 of the picks.
1552 /// Empirical bound: ±5% across 30 000 trials with a
1553 /// well-distributed input.
1554 #[test]
1555 fn select_shard_distribution_is_unbiased() {
1556 // 3 candidates: a non-power-of-2 to expose the modulo
1557 // bias the fix removes.
1558 let mapper = ShardMapper::new(3, 1024, ScalingPolicy::default()).unwrap();
1559
1560 let trials = 30_000u64;
1561 // Spread inputs uniformly across the u64 range so the
1562 // multiply-shift mapping behaves as designed.
1563 let stride = u64::MAX / trials;
1564 let mut counts = [0u64; 3];
1565 for i in 0..trials {
1566 let h = i.wrapping_mul(stride);
1567 let id = mapper.select_shard(h);
1568 counts[id as usize] += 1;
1569 }
1570
1571 let expected = (trials / 3) as i64;
1572 for (id, &count) in counts.iter().enumerate() {
1573 let diff = (count as i64 - expected).abs();
1574 let pct = (diff as f64 / expected as f64) * 100.0;
1575 assert!(
1576 pct < 5.0,
1577 "shard {} bucket has {} hits ({:.2}% off expected {}); \
1578 modulo bias would drift higher on certain shards",
1579 id,
1580 count,
1581 pct,
1582 expected
1583 );
1584 }
1585 }
1586
1587 #[test]
1588 fn test_scale_up() {
1589 // Explicitly set max_shards to allow scaling from 2 to 4
1590 let policy = ScalingPolicy {
1591 max_shards: 8,
1592 ..Default::default()
1593 };
1594 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1595
1596 let new_ids = mapper.scale_up(2).unwrap();
1597 assert_eq!(new_ids.len(), 2);
1598 assert_eq!(mapper.active_shard_count(), 4);
1599 }
1600
1601 #[test]
1602 fn test_scale_up_max_limit() {
1603 let policy = ScalingPolicy {
1604 max_shards: 4,
1605 ..Default::default()
1606 };
1607 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1608
1609 let result = mapper.scale_up(1);
1610 assert!(matches!(result, Err(ScalingError::AtMaxShards)));
1611 }
1612
1613 #[test]
1614 fn test_scale_down() {
1615 let policy = ScalingPolicy {
1616 min_shards: 1,
1617 cooldown: Duration::from_nanos(1), // Disable cooldown for test
1618 ..Default::default()
1619 };
1620 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1621
1622 let drained = mapper.scale_down(2).unwrap();
1623 assert_eq!(drained.len(), 2);
1624 assert_eq!(mapper.active_shard_count(), 2);
1625 }
1626
1627 #[test]
1628 fn test_scale_down_min_limit() {
1629 let policy = ScalingPolicy {
1630 min_shards: 4,
1631 ..Default::default()
1632 };
1633 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1634
1635 let result = mapper.scale_down(1);
1636 assert!(matches!(result, Err(ScalingError::AtMinShards)));
1637 }
1638
1639 #[test]
1640 fn test_metrics_collection() {
1641 let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1642
1643 // Record some metrics
1644 if let Some(collector) = mapper.metrics_collector(0) {
1645 collector.record_buffer_len(512);
1646 collector.record_push(5);
1647 collector.record_push(10);
1648 }
1649
1650 let metrics = mapper.collect_metrics();
1651 assert_eq!(metrics.len(), 2);
1652
1653 let shard0_metrics = metrics.iter().find(|m| m.shard_id == 0).unwrap();
1654 assert!(shard0_metrics.fill_ratio > 0.0);
1655 }
1656
1657 /// Regression: a `record_push` / `record_flush` interleaving
1658 /// with a `collect_and_reset` swap must NOT desync `(sum,
1659 /// count)`. Pre-fix `push_latency_sum_ns` and `push_count`
1660 /// were independent atomics; a tick between the two
1661 /// `fetch_add`s captured the sum without the matching count
1662 /// (or the count without the sum). The resulting `avg =
1663 /// sum.checked_div(count).unwrap_or(0)` returned 0 in window
1664 /// N (sum without count) AND 0 in window N+1 (count without
1665 /// sum) — silently zeroing the average that drives
1666 /// `evaluate_scaling`'s push-latency scale-up trigger.
1667 ///
1668 /// Post-fix `(sum, count)` is packed into one
1669 /// `AtomicU64` so the swap captures both atomically. This
1670 /// test fires N concurrent `record_push` calls and a single
1671 /// `collect_and_reset` and asserts the captured count
1672 /// matches the captured sum (i.e. `sum >= count` because
1673 /// every push contributes at least 1 ns; `sum / count` is
1674 /// well-defined for any non-zero count).
1675 #[test]
1676 fn record_push_collect_no_sum_count_desync() {
1677 use std::sync::Barrier;
1678 use std::thread;
1679
1680 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1681 const PUSHERS: usize = 4;
1682 const PUSHES_PER_THREAD: usize = 1_000;
1683
1684 let barrier = Arc::new(Barrier::new(PUSHERS + 1));
1685 let mut handles = vec![];
1686 for _ in 0..PUSHERS {
1687 let c = collector.clone();
1688 let b = barrier.clone();
1689 handles.push(thread::spawn(move || {
1690 b.wait();
1691 for i in 0..PUSHES_PER_THREAD {
1692 // Vary latencies so sum/count averages
1693 // aren't trivially the same number.
1694 c.record_push((i as u64 % 100) + 1);
1695 }
1696 }));
1697 }
1698
1699 // Race a collect_and_reset across the pushers.
1700 barrier.wait();
1701 let snapshot1 = collector.collect_and_reset();
1702 for h in handles {
1703 h.join().unwrap();
1704 }
1705 // After all threads finish, drain whatever remains.
1706 let snapshot2 = collector.collect_and_reset();
1707
1708 // Reconstruct count and sum from the two snapshots.
1709 // We can't easily expose the packed atomic, so we use
1710 // the per-window averages and event counts as a
1711 // consistency check.
1712 let total_events = snapshot1.event_rate + snapshot2.event_rate;
1713 assert_eq!(
1714 total_events as usize,
1715 PUSHERS * PUSHES_PER_THREAD,
1716 "all pushes must be accounted for"
1717 );
1718
1719 // For each window: if event_rate > 0, avg_push_latency
1720 // must be > 0 (non-zero average) — pre-fix the desync
1721 // could land event_rate > 0 with avg = 0 (count
1722 // captured without sum, or sum without count → div-by-
1723 // zero clamped to 0). This is the directly visible
1724 // symptom of the desync.
1725 //
1726 // events_in_window is incremented separately from the
1727 // packed (sum, count) word, so a strict assertion of
1728 // "event_rate is exactly count" isn't safe — but the
1729 // weaker invariant "if any pushes were captured in the
1730 // (sum,count) word, the average is non-zero" survives
1731 // the packed-atomic fix.
1732 for snap in [&snapshot1, &snapshot2] {
1733 if snap.avg_push_latency_ns == 0 {
1734 // Either no pushes were captured in this
1735 // window, or — pre-fix — sum/count desynced.
1736 // The post-fix shape can only produce
1737 // avg=0 when count is also 0; we can't read
1738 // count directly from ShardMetrics, but
1739 // exercise the invariant by confirming the
1740 // OTHER window's sum is consistent with all
1741 // pushes.
1742 continue;
1743 }
1744 assert!(
1745 snap.avg_push_latency_ns >= 1,
1746 "regression: a window with non-zero avg must have \
1747 a positive sum (pre-fix sum-without-count desync \
1748 produced avg=0 with non-zero events)"
1749 );
1750 }
1751 }
1752
1753 #[test]
1754 fn test_draining_excludes_from_selection() {
1755 let policy = ScalingPolicy {
1756 min_shards: 1,
1757 cooldown: Duration::from_nanos(1),
1758 ..Default::default()
1759 };
1760 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1761
1762 // Drain one shard
1763 let drained = mapper.scale_down(1).unwrap();
1764 assert_eq!(drained.len(), 1);
1765
1766 // All selections should go to the remaining active shard
1767 let active_ids = mapper.active_shard_ids();
1768 assert_eq!(active_ids.len(), 1);
1769
1770 for i in 0..100u64 {
1771 let selected = mapper.select_shard(i);
1772 assert!(active_ids.contains(&selected));
1773 }
1774 }
1775
1776 #[test]
1777 fn test_policy_validation() {
1778 let invalid_policy = ScalingPolicy {
1779 fill_ratio_threshold: 1.5, // Invalid
1780 ..Default::default()
1781 };
1782 assert!(invalid_policy.validate().is_err());
1783
1784 // Without normalize(), this would be invalid
1785 let invalid_policy2 = ScalingPolicy {
1786 min_shards: 10,
1787 max_shards: 5,
1788 ..Default::default()
1789 };
1790 assert!(invalid_policy2.validate().is_err());
1791 }
1792
1793 #[test]
1794 fn test_policy_normalize_auto_adjusts_max_shards() {
1795 // When min_shards > max_shards, normalize() should adjust max_shards
1796 let policy = ScalingPolicy {
1797 min_shards: 8,
1798 max_shards: 2, // Less than min_shards
1799 ..Default::default()
1800 };
1801
1802 let normalized = policy.normalize();
1803 assert_eq!(
1804 normalized.max_shards, 8,
1805 "max_shards should be adjusted to min_shards"
1806 );
1807 assert!(
1808 normalized.validate().is_ok(),
1809 "normalized policy should be valid"
1810 );
1811 }
1812
1813 /// Regression: BUG_REPORT.md #7 — `scale_up` previously allocated
1814 /// new shard ids as `shards.iter().max() + 1`, which reused ids
1815 /// after the highest-numbered shard was drained-and-removed.
1816 /// Reusing ids merges two distinct shard lifetimes in any
1817 /// external metric/checkpoint system that keys on shard id.
1818 /// The fix uses a monotonic `next_shard_id` counter.
1819 #[test]
1820 fn scale_up_does_not_reuse_ids_after_remove() {
1821 let policy = ScalingPolicy {
1822 min_shards: 1,
1823 max_shards: 16,
1824 cooldown: Duration::from_nanos(1),
1825 ..Default::default()
1826 };
1827 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1828
1829 // Initial ids are 0 and 1. Scale up to 4 — new ids must be
1830 // 2 and 3 (the next two slots from the monotonic counter).
1831 let new_ids = mapper.scale_up(2).unwrap();
1832 assert_eq!(new_ids, vec![2, 3]);
1833
1834 // Drain one shard. `scale_down` picks by lowest weight, so
1835 // we don't get to choose which id is drained. Whichever it
1836 // is, we then force it through Stopped + remove and check
1837 // that the removed id is *not* reissued by the next
1838 // scale_up.
1839 let drained = mapper.scale_down(1).unwrap();
1840 let drained_id = drained[0];
1841 for shard in mapper.shards.write().iter_mut() {
1842 if shard.id == drained_id {
1843 shard.state = ShardState::Stopped;
1844 }
1845 }
1846 let removed = mapper.remove_stopped_shards();
1847 assert_eq!(removed, vec![drained_id]);
1848
1849 // Scale up by 1. The broken `max(existing_ids) + 1` allocator
1850 // could revive the just-removed id whenever `drained_id` had
1851 // been the highest id present (e.g. id 3 with 0/1/2/3 → 0/1/2
1852 // → next would be 3 again). The fix uses the monotonic
1853 // counter, which sits at 4 after the earlier scale_up, so
1854 // the new id must be 4 regardless of which id was drained.
1855 let new_ids = mapper.scale_up(1).unwrap();
1856 assert_eq!(
1857 new_ids,
1858 vec![4],
1859 "shard id {drained_id} was just removed; reusing any \
1860 previously-issued id would merge two distinct shard \
1861 lifetimes in external systems"
1862 );
1863 }
1864
1865 #[test]
1866 fn test_policy_normalize_preserves_valid_config() {
1867 // When max_shards >= min_shards, normalize() should not change anything
1868 let policy = ScalingPolicy {
1869 min_shards: 4,
1870 max_shards: 16,
1871 ..Default::default()
1872 };
1873
1874 let normalized = policy.normalize();
1875 assert_eq!(normalized.min_shards, 4);
1876 assert_eq!(normalized.max_shards, 16);
1877 }
1878
1879 #[test]
1880 fn test_shard_mapper_normalizes_policy() {
1881 // ShardMapper should accept a policy where min_shards > default max_shards
1882 // because it calls normalize() internally
1883 let policy = ScalingPolicy {
1884 min_shards: 4,
1885 ..Default::default()
1886 };
1887
1888 // This should succeed even on machines with < 4 CPUs
1889 let result = ShardMapper::new(4, 1024, policy);
1890 assert!(
1891 result.is_ok(),
1892 "ShardMapper should normalize policy automatically"
1893 );
1894 }
1895
1896 #[test]
1897 fn test_shard_mapper_adjusts_max_shards_to_initial_count() {
1898 // ShardMapper should adjust max_shards to accommodate initial_shards
1899 // even if initial_shards > default max_shards (CPU count)
1900 let policy = ScalingPolicy::default();
1901
1902 // Create mapper with 8 initial shards - should work even on 2-core machines
1903 let result = ShardMapper::new(8, 1024, policy);
1904 assert!(
1905 result.is_ok(),
1906 "ShardMapper should adjust max_shards to initial_shards"
1907 );
1908
1909 let mapper = result.unwrap();
1910 assert_eq!(mapper.active_shard_count(), 8);
1911
1912 // Verify the policy was adjusted
1913 assert!(
1914 mapper.policy().max_shards >= 8,
1915 "max_shards should be at least initial_shards"
1916 );
1917 }
1918
1919 #[test]
1920 fn test_scale_up_max_shards_concurrent() {
1921 use std::sync::Arc;
1922 use std::thread;
1923
1924 let policy = ScalingPolicy {
1925 max_shards: 10,
1926 cooldown: Duration::from_nanos(1), // Disable cooldown for test
1927 ..Default::default()
1928 };
1929 let mapper = Arc::new(ShardMapper::new(5, 1024, policy).unwrap());
1930
1931 // Spawn multiple threads that all try to scale up
1932 let mut handles = vec![];
1933 for _ in 0..5 {
1934 let mapper_clone = mapper.clone();
1935 handles.push(thread::spawn(move || mapper_clone.scale_up(3)));
1936 }
1937
1938 // Collect results
1939 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1940
1941 // Some should succeed, some should fail with AtMaxShards
1942 let successes: Vec<_> = results.iter().filter(|r| r.is_ok()).collect();
1943 let failures: Vec<_> = results
1944 .iter()
1945 .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1946 .collect();
1947
1948 // We started with 5, max is 10, each tries to add 3
1949 // At most we can add 5 more shards, so at most 1-2 can succeed
1950 assert!(
1951 !successes.is_empty() || !failures.is_empty(),
1952 "at least some operations should complete"
1953 );
1954
1955 // Final count should not exceed max_shards
1956 assert!(
1957 mapper.active_shard_count() <= 10,
1958 "should never exceed max_shards, got {}",
1959 mapper.active_shard_count()
1960 );
1961 }
1962
1963 /// Multiple `scale_up_provisioning(1)` calls must
1964 /// never push `active_count` past `max_shards` via subsequent
1965 /// `activate()` calls. Pre-fix the budget gate
1966 /// (`check_scale_up_budget`) only counted ALREADY-active
1967 /// shards, so several `scale_up_provisioning` calls could each
1968 /// pass and then each `activate()` unconditionally bumped
1969 /// `active_count` past the cap.
1970 ///
1971 /// Setup: at the budget edge, allocate two Provisioning shards
1972 /// in sequence (both pass the gate because `active_count`
1973 /// hasn't moved). The first `activate()` fills the budget;
1974 /// the second must surface `AtMaxShards` rather than overflow.
1975 #[test]
1976 fn activate_rejects_when_active_count_would_exceed_max_shards() {
1977 let policy = ScalingPolicy {
1978 min_shards: 1,
1979 max_shards: 4,
1980 cooldown: Duration::from_nanos(1),
1981 ..Default::default()
1982 };
1983 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1984 // active_count = 3, max = 4. Allocate TWO Provisioning
1985 // shards; both pass the budget gate (it sees active_count=3).
1986 let ids_a = mapper.scale_up_provisioning(1).unwrap();
1987 // The 1ns cooldown elapses on every realistic scheduler
1988 // tick; if we lose that race, retry. Release-mode and
1989 // some debug-build paths occasionally finish the first
1990 // allocation in <1ns of wall time.
1991 let ids_b = loop {
1992 match mapper.scale_up_provisioning(1) {
1993 Ok(v) => break v,
1994 Err(ScalingError::InCooldown) => {
1995 std::thread::sleep(Duration::from_nanos(50));
1996 }
1997 Err(e) => panic!("unexpected error from scale_up_provisioning: {:?}", e),
1998 }
1999 };
2000 assert_eq!(ids_a.len(), 1);
2001 assert_eq!(ids_b.len(), 1);
2002
2003 // First activate succeeds (active_count goes 3 → 4 = max).
2004 mapper
2005 .activate(ids_a[0])
2006 .expect("first activate must succeed");
2007 assert_eq!(mapper.active_shard_count(), 4);
2008
2009 // Second activate must REFUSE — it would push past max.
2010 let err = mapper
2011 .activate(ids_b[0])
2012 .expect_err("second activate must reject");
2013 assert!(
2014 matches!(err, ScalingError::AtMaxShards),
2015 "expected AtMaxShards, got {:?}",
2016 err
2017 );
2018 // active_count must still be at the cap, not over it.
2019 assert_eq!(mapper.active_shard_count(), 4);
2020 }
2021
2022 /// Pin: under contention, the active_count never transiently
2023 /// exceeds `max_shards`. Pre-fix `activate` released the
2024 /// shards write-lock BEFORE the `fetch_add(1)`, so two
2025 /// concurrent activators could each pass the budget gate
2026 /// (both reading the pre-bump count) and both bump,
2027 /// overshooting the cap by 1 at any observation point.
2028 #[test]
2029 fn concurrent_activate_never_exceeds_max_shards() {
2030 use std::sync::Arc;
2031 use std::sync::Barrier;
2032 use std::thread;
2033
2034 const ITERATIONS: usize = 200;
2035
2036 for iter in 0..ITERATIONS {
2037 let policy = ScalingPolicy {
2038 min_shards: 1,
2039 max_shards: 4,
2040 cooldown: Duration::from_nanos(1),
2041 ..Default::default()
2042 };
2043 // Start at active=3, allocate two Provisioning ids so
2044 // both threads have a candidate to activate. With the
2045 // pre-fix race: thread A loads count=3, validates,
2046 // sets state Active, drops lock; thread B loads count
2047 // (still 3), validates, sets state Active, drops lock;
2048 // A bumps → 4; B bumps → 5. Post-fix B observes
2049 // count=4 inside its lock and rejects with
2050 // AtMaxShards.
2051 let mapper = Arc::new(ShardMapper::new(3, 1024, policy).unwrap());
2052 let ids_a = mapper.scale_up_provisioning(1).unwrap();
2053 // The 1ns cooldown elapses on every realistic
2054 // scheduler tick; if we lose that race, retry once
2055 // (release-mode iterations occasionally finish the
2056 // first allocation in <1ns of wall time).
2057 let ids_b = loop {
2058 match mapper.scale_up_provisioning(1) {
2059 Ok(v) => break v,
2060 Err(ScalingError::InCooldown) => {
2061 std::thread::sleep(Duration::from_micros(10));
2062 continue;
2063 }
2064 Err(e) => panic!("unexpected error: {:?}", e),
2065 }
2066 };
2067 assert_eq!(ids_a.len(), 1);
2068 assert_eq!(ids_b.len(), 1);
2069
2070 let barrier = Arc::new(Barrier::new(2));
2071 let m1 = mapper.clone();
2072 let m2 = mapper.clone();
2073 let b1 = barrier.clone();
2074 let b2 = barrier.clone();
2075 let id_a = ids_a[0];
2076 let id_b = ids_b[0];
2077
2078 let h1 = thread::spawn(move || {
2079 b1.wait();
2080 m1.activate(id_a)
2081 });
2082 let h2 = thread::spawn(move || {
2083 b2.wait();
2084 m2.activate(id_b)
2085 });
2086 let r1 = h1.join().expect("thread A panicked");
2087 let r2 = h2.join().expect("thread B panicked");
2088
2089 // Exactly one must succeed and one must reject with
2090 // AtMaxShards.
2091 let ok_count = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2092 let at_max_count = [&r1, &r2]
2093 .iter()
2094 .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
2095 .count();
2096 assert_eq!(
2097 ok_count, 1,
2098 "iter {}: expected exactly one Ok, got r1={:?} r2={:?}",
2099 iter, r1, r2
2100 );
2101 assert_eq!(
2102 at_max_count, 1,
2103 "iter {}: expected exactly one AtMaxShards, got r1={:?} r2={:?}",
2104 iter, r1, r2
2105 );
2106
2107 // active_count must not exceed max_shards at any
2108 // observation point.
2109 assert!(
2110 mapper.active_shard_count() <= 4,
2111 "iter {}: active_count={} exceeded max_shards=4",
2112 iter,
2113 mapper.active_shard_count(),
2114 );
2115 }
2116 }
2117
2118 /// Two concurrent `scale_up(1)` calls must never both succeed
2119 /// inside a single cooldown window. Before the fix, the
2120 /// cooldown check happened only under a read lock that was
2121 /// released *before* the mutating write lock, so two threads
2122 /// could both observe `last_scaling=None` (or stale), both
2123 /// acquire the write lock in turn, and both succeed — racing
2124 /// past the cooldown floor and (on a max-shard-bounded
2125 /// scenario) potentially the `max_shards` cap.
2126 ///
2127 /// Pin: across `ITERATIONS` rounds of two-thread races, every
2128 /// iteration sees exactly one success and one `InCooldown`.
2129 #[test]
2130 fn cooldown_is_enforced_under_concurrent_scale_up() {
2131 use std::sync::Arc;
2132 use std::sync::Barrier;
2133 use std::thread;
2134
2135 const ITERATIONS: usize = 1_000;
2136
2137 for iter in 0..ITERATIONS {
2138 let policy = ScalingPolicy {
2139 min_shards: 1,
2140 max_shards: 16,
2141 // Large cooldown so a single iteration can't pass
2142 // the floor between the two threads' calls.
2143 cooldown: Duration::from_secs(60),
2144 ..Default::default()
2145 };
2146 let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2147 let barrier = Arc::new(Barrier::new(2));
2148
2149 let m1 = mapper.clone();
2150 let b1 = barrier.clone();
2151 let m2 = mapper.clone();
2152 let b2 = barrier.clone();
2153
2154 let h1 = thread::spawn(move || {
2155 b1.wait();
2156 m1.scale_up(1)
2157 });
2158 let h2 = thread::spawn(move || {
2159 b2.wait();
2160 m2.scale_up(1)
2161 });
2162
2163 let r1 = h1.join().unwrap();
2164 let r2 = h2.join().unwrap();
2165
2166 let oks = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2167 let cooldowns = [&r1, &r2]
2168 .iter()
2169 .filter(|r| matches!(r, Err(ScalingError::InCooldown)))
2170 .count();
2171
2172 assert_eq!(
2173 oks, 1,
2174 "iter {iter}: expected exactly one Ok, got r1={r1:?}, r2={r2:?}"
2175 );
2176 assert_eq!(
2177 cooldowns, 1,
2178 "iter {iter}: expected exactly one InCooldown, got r1={r1:?}, r2={r2:?}"
2179 );
2180 assert_eq!(
2181 mapper.active_shard_count(),
2182 3,
2183 "iter {iter}: cooldown violated — both calls mutated state (shard count {})",
2184 mapper.active_shard_count()
2185 );
2186 }
2187 }
2188
2189 #[test]
2190 fn test_scale_up_overflow_protection() {
2191 // Create mapper with a high starting shard ID to test overflow
2192 // protection. The monotonic id allocator advances by 1 per
2193 // shard regardless of which shards have been removed, so we
2194 // bump `next_shard_id` directly to simulate a near-`u16::MAX`
2195 // state.
2196 let policy = ScalingPolicy {
2197 max_shards: u16::MAX,
2198 ..Default::default()
2199 };
2200 let mapper = ShardMapper::new(1, 1024, policy).unwrap();
2201
2202 // Position the allocator so the next id is u16::MAX - 1.
2203 // Trying to allocate 3 ids would need {MAX-1, MAX, MAX+1};
2204 // the last is unrepresentable in `u16`, so scale_up rejects.
2205 mapper
2206 .next_shard_id
2207 .store(u16::MAX - 1, AtomicOrdering::Relaxed);
2208
2209 let result = mapper.scale_up(3);
2210 assert!(matches!(result, Err(ScalingError::AtMaxShards)));
2211
2212 // Adding 1 shard should still work (id = MAX - 1).
2213 let result = mapper.scale_up(1);
2214 assert!(result.is_ok());
2215 }
2216
2217 #[test]
2218 fn test_evaluate_scaling_auto_scale_disabled() {
2219 let policy = ScalingPolicy {
2220 auto_scale: false,
2221 ..Default::default()
2222 };
2223 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2224
2225 let decision = mapper.evaluate_scaling();
2226 assert!(matches!(decision, ScalingDecision::None));
2227 }
2228
2229 #[test]
2230 fn test_evaluate_scaling_in_cooldown() {
2231 let policy = ScalingPolicy {
2232 cooldown: Duration::from_secs(60),
2233 ..Default::default()
2234 };
2235 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2236
2237 // Trigger a scaling operation to start cooldown
2238 *mapper.last_scaling.write() = Some(Instant::now());
2239
2240 let decision = mapper.evaluate_scaling();
2241 assert!(matches!(decision, ScalingDecision::None));
2242 }
2243
2244 #[test]
2245 fn test_evaluate_scaling_scale_up_on_high_fill_ratio() {
2246 let policy = ScalingPolicy {
2247 fill_ratio_threshold: 0.7,
2248 max_shards: 8,
2249 cooldown: Duration::from_nanos(1),
2250 ..Default::default()
2251 };
2252 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2253
2254 // Set high fill ratio on majority of shards
2255 {
2256 let mut shards = mapper.shards.write();
2257 for shard in shards.iter_mut() {
2258 shard.last_metrics.fill_ratio = 0.9; // Above threshold
2259 }
2260 }
2261
2262 let decision = mapper.evaluate_scaling();
2263 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2264 }
2265
2266 #[test]
2267 fn test_evaluate_scaling_scale_up_on_high_latency() {
2268 let policy = ScalingPolicy {
2269 push_latency_threshold_ns: 10,
2270 max_shards: 8,
2271 cooldown: Duration::from_nanos(1),
2272 ..Default::default()
2273 };
2274 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2275
2276 // Set high push latency on majority of shards
2277 {
2278 let mut shards = mapper.shards.write();
2279 for shard in shards.iter_mut() {
2280 shard.last_metrics.avg_push_latency_ns = 100; // Above threshold
2281 }
2282 }
2283
2284 let decision = mapper.evaluate_scaling();
2285 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2286 }
2287
2288 #[test]
2289 fn test_evaluate_scaling_scale_down_on_underutilized() {
2290 let policy = ScalingPolicy {
2291 underutilized_threshold: 0.2,
2292 min_shards: 2,
2293 cooldown: Duration::from_nanos(1),
2294 ..Default::default()
2295 };
2296 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2297
2298 // Set low fill ratio and zero event rate on majority of shards
2299 {
2300 let mut shards = mapper.shards.write();
2301 for shard in shards.iter_mut() {
2302 shard.last_metrics.fill_ratio = 0.05; // Below threshold
2303 shard.last_metrics.event_rate = 0;
2304 }
2305 }
2306
2307 let decision = mapper.evaluate_scaling();
2308 assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
2309 }
2310
2311 /// Regression: a freshly-activated shard must NOT
2312 /// immediately count toward the underutilized tally on the
2313 /// next `evaluate_scaling`. Pre-fix the new shard's
2314 /// `last_metrics` was the `ShardMetrics::new(id)` placeholder
2315 /// (`fill_ratio = 0.0, event_rate = 0`), which matched the
2316 /// underutilized trigger immediately — oscillating the
2317 /// system: scale-up → next tick scale-down → next tick
2318 /// scale-up.
2319 ///
2320 /// Post-fix `MappedShard.activated_at` is stamped at the
2321 /// Provisioning → Active transition (and at scale-up
2322 /// construction for `Active`-from-create shards), and
2323 /// `evaluate_scaling` skips shards within `policy.cooldown`
2324 /// of activation.
2325 #[test]
2326 fn freshly_added_shard_skipped_from_evaluate_scaling_warmup() {
2327 // Short cooldown so we can pin "outside warmup" with a
2328 // small `checked_sub`-safe offset (200ms). Production
2329 // stamps boot shards 1 hour in the past, but that
2330 // subtraction falls back to `Instant::now()` on hosts
2331 // with sub-1h uptime (Windows Instant is bounded by
2332 // boot), making all shards land inside the warmup
2333 // window. We override both boot shards explicitly so
2334 // the test is independent of system uptime.
2335 let policy = ScalingPolicy {
2336 underutilized_threshold: 0.2,
2337 min_shards: 1,
2338 cooldown: Duration::from_millis(100),
2339 ..Default::default()
2340 };
2341 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2342
2343 // Direct manipulation of the shard list: pin two boot
2344 // shards as underutilized AND safely outside the warmup
2345 // window, and inject one freshly-activated shard with
2346 // `activated_at = now()` whose placeholder metrics ALSO
2347 // trigger the underutilized predicate. Without the
2348 // warmup skip, the count would be 3 of 3 shards
2349 // underutilized → scale-down. WITH the warmup skip,
2350 // only the 2 boot shards count, and 2 of 3 still
2351 // exceeds the 3/2 = 1 majority — scale-down still fires
2352 // (driven by the boot shards), but the decisive property
2353 // is that the fresh shard is correctly excluded.
2354 let old_ts = Instant::now()
2355 .checked_sub(Duration::from_millis(200))
2356 .expect("test host uptime should exceed 200ms");
2357 {
2358 let mut shards = mapper.shards.write();
2359 for shard in shards.iter_mut() {
2360 shard.last_metrics.fill_ratio = 0.05;
2361 shard.last_metrics.event_rate = 0;
2362 // Pin boot shards safely outside the cooldown,
2363 // independent of production's 1-hour stamp.
2364 shard.activated_at = old_ts;
2365 }
2366 // The third shard is "fresh": stamp activated_at to
2367 // now, simulating a just-activated shard.
2368 shards[2].activated_at = Instant::now();
2369 }
2370
2371 // The fresh shard must satisfy the warmup predicate.
2372 let now = Instant::now();
2373 let warmup_excluded: Vec<u16> = mapper
2374 .shards
2375 .read()
2376 .iter()
2377 .filter(|s| now.duration_since(s.activated_at) < mapper.policy.cooldown)
2378 .map(|s| s.id)
2379 .collect();
2380 assert_eq!(
2381 warmup_excluded,
2382 vec![2u16],
2383 "regression: only shard id 2 (just-stamped) should be \
2384 within the warmup window; boot shards are stamped \
2385 200ms in the past (well outside the 100ms cooldown)"
2386 );
2387
2388 // And evaluate_scaling must still produce ScaleDown
2389 // (driven by the 2 boot shards) — the fresh shard's
2390 // placeholder doesn't get to vote.
2391 let decision = mapper.evaluate_scaling();
2392 assert!(
2393 matches!(decision, ScalingDecision::ScaleDown(_)),
2394 "scale-down still fires from the boot shards' real \
2395 underutilization, but driven by 2 of 3 not 3 of 3 — \
2396 got {:?}",
2397 decision,
2398 );
2399 }
2400
2401 #[test]
2402 fn test_evaluate_scaling_no_scale_up_at_max() {
2403 let policy = ScalingPolicy {
2404 fill_ratio_threshold: 0.7,
2405 max_shards: 4, // Already at max
2406 cooldown: Duration::from_nanos(1),
2407 ..Default::default()
2408 };
2409 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2410
2411 // Set high fill ratio
2412 {
2413 let mut shards = mapper.shards.write();
2414 for shard in shards.iter_mut() {
2415 shard.last_metrics.fill_ratio = 0.9;
2416 }
2417 }
2418
2419 let decision = mapper.evaluate_scaling();
2420 assert!(matches!(decision, ScalingDecision::None));
2421 }
2422
2423 #[test]
2424 fn test_evaluate_scaling_no_scale_down_at_min() {
2425 let policy = ScalingPolicy {
2426 underutilized_threshold: 0.2,
2427 min_shards: 4, // Already at min
2428 cooldown: Duration::from_nanos(1),
2429 ..Default::default()
2430 };
2431 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2432
2433 // Set underutilized metrics
2434 {
2435 let mut shards = mapper.shards.write();
2436 for shard in shards.iter_mut() {
2437 shard.last_metrics.fill_ratio = 0.05;
2438 shard.last_metrics.event_rate = 0;
2439 }
2440 }
2441
2442 let decision = mapper.evaluate_scaling();
2443 assert!(matches!(decision, ScalingDecision::None));
2444 }
2445
2446 #[test]
2447 fn test_evaluate_scaling_ignores_draining_shards() {
2448 let policy = ScalingPolicy {
2449 fill_ratio_threshold: 0.7,
2450 max_shards: 8,
2451 cooldown: Duration::from_nanos(1),
2452 ..Default::default()
2453 };
2454 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2455
2456 // Set high fill ratio but mark shards as draining
2457 {
2458 let mut shards = mapper.shards.write();
2459 for shard in shards.iter_mut() {
2460 shard.last_metrics.fill_ratio = 0.9;
2461 shard.state = ShardState::Draining;
2462 }
2463 }
2464
2465 // Should not scale up since draining shards are ignored
2466 let decision = mapper.evaluate_scaling();
2467 assert!(matches!(decision, ScalingDecision::None));
2468 }
2469
2470 #[test]
2471 fn test_shard_metrics_new() {
2472 let metrics = ShardMetrics::new(5);
2473 assert_eq!(metrics.shard_id, 5);
2474 assert_eq!(metrics.fill_ratio, 0.0);
2475 assert_eq!(metrics.event_rate, 0);
2476 assert!(!metrics.draining);
2477 }
2478
2479 #[test]
2480 fn test_shard_metrics_compute_weight() {
2481 let mut metrics = ShardMetrics::new(0);
2482 metrics.fill_ratio = 0.5;
2483 metrics.avg_push_latency_ns = 100;
2484 metrics.event_rate = 1_000_000;
2485
2486 metrics.compute_weight();
2487 assert!(metrics.weight > 0.0);
2488 }
2489
2490 #[test]
2491 fn test_shard_metrics_draining_max_weight() {
2492 let mut metrics = ShardMetrics::new(0);
2493 metrics.draining = true;
2494 metrics.compute_weight();
2495 assert_eq!(metrics.weight, f64::MAX);
2496 }
2497
2498 #[test]
2499 fn test_scaling_decision_debug() {
2500 let none = ScalingDecision::None;
2501 let up = ScalingDecision::ScaleUp(2);
2502 let down = ScalingDecision::ScaleDown(1);
2503
2504 assert!(format!("{:?}", none).contains("None"));
2505 assert!(format!("{:?}", up).contains("ScaleUp"));
2506 assert!(format!("{:?}", down).contains("ScaleDown"));
2507 }
2508
2509 /// Regression: BUG_REPORT.md #46 — `scale_up_provisioning`
2510 /// allocates a shard but `select_shard` must NOT route to it
2511 /// until `activate` has been called. This is the load-bearing
2512 /// invariant that lets `EventBus::add_shard_internal` wire up
2513 /// drain workers before producers can land in the new ring
2514 /// buffer.
2515 #[test]
2516 fn provisioning_shard_is_not_selectable_until_activated() {
2517 let policy = ScalingPolicy {
2518 min_shards: 1,
2519 max_shards: 16,
2520 cooldown: Duration::from_nanos(1),
2521 ..Default::default()
2522 };
2523 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2524
2525 // Allocate a provisioning shard. Existing ids are 0,1; new
2526 // is 2.
2527 let new_ids = mapper.scale_up_provisioning(1).unwrap();
2528 assert_eq!(new_ids, vec![2]);
2529
2530 // The provisioning shard must NOT appear in active accounting.
2531 assert_eq!(mapper.active_shard_count(), 2);
2532 assert_eq!(mapper.shard_state(2), Some(ShardState::Provisioning));
2533
2534 // Across many hashes, `select_shard` must never pick id 2.
2535 // Spread the input hashes across the u64 range so the
2536 // unbiased Lemire-style mapping actually
2537 // distributes — sequential small ids would all map to
2538 // index 0 because `(small * len) >> 64 = 0`. Production
2539 // callers pass `xxh3_64`-hashed event payloads, which
2540 // are uniform u64s; this scaling mirrors that.
2541 let mut seen: std::collections::HashSet<u16> = std::collections::HashSet::new();
2542 let stride = u64::MAX / 10_000;
2543 for i in 0u64..10_000 {
2544 seen.insert(mapper.select_shard(i.wrapping_mul(stride)));
2545 }
2546 assert!(
2547 !seen.contains(&2),
2548 "provisioning shard 2 was selected — \
2549 producers would push into a ring buffer with no consumer (#46)"
2550 );
2551 assert!(seen == [0, 1].into_iter().collect());
2552
2553 // After `activate`, the shard is selectable.
2554 mapper.activate(2).unwrap();
2555 assert_eq!(mapper.shard_state(2), Some(ShardState::Active));
2556 assert_eq!(mapper.active_shard_count(), 3);
2557
2558 let mut seen_after: std::collections::HashSet<u16> = std::collections::HashSet::new();
2559 let stride = u64::MAX / 10_000;
2560 for i in 0u64..10_000 {
2561 seen_after.insert(mapper.select_shard(i.wrapping_mul(stride)));
2562 }
2563 assert!(
2564 seen_after.contains(&2),
2565 "after activate, shard 2 should be a valid select_shard target"
2566 );
2567 }
2568
2569 /// Regression: BUG_REPORT.md #51 — when no `Active` shard exists
2570 /// (e.g. all shards are Draining or Provisioning), `select_shard`
2571 /// must NOT fall back to a Draining shard. Pushing into a
2572 /// draining shard increments `pushes_since_drain_start` and
2573 /// blocks finalization indefinitely.
2574 #[test]
2575 fn select_shard_does_not_fall_back_to_draining() {
2576 let policy = ScalingPolicy {
2577 min_shards: 0,
2578 max_shards: 8,
2579 cooldown: Duration::from_nanos(1),
2580 ..Default::default()
2581 };
2582 // Force min_shards = 0 via direct construction so we can drain
2583 // every shard.
2584 let mut policy = policy;
2585 policy.min_shards = 1;
2586 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2587
2588 // Force every shard into Draining state directly. We bypass
2589 // `scale_down`'s min_shards floor by mutating the test-only
2590 // accessor; this models a state that can otherwise be
2591 // reached by `drain_specific` calls.
2592 //
2593 // Backdoor mutations don't trigger the `select_shard`
2594 // selection-table rebuild that the public API does, so we
2595 // call it explicitly here to model what `drain_specific`
2596 // would have done.
2597 {
2598 let mut shards = mapper.shards.write();
2599 for s in shards.iter_mut() {
2600 s.state = ShardState::Draining;
2601 s.metrics.set_draining(true);
2602 }
2603 mapper.rebuild_selection_table_locked(&shards);
2604 }
2605
2606 // The fallback must return the OOB sentinel `u16::MAX` rather
2607 // than a draining shard id (0 or 1). The upstream
2608 // `resolve_idx` path will see no match for `u16::MAX` and
2609 // surface as `Unrouted` (#44), which is the correct signal:
2610 // "no destination, do not push."
2611 for hash in 0u64..1_000 {
2612 let picked = mapper.select_shard(hash);
2613 assert_eq!(
2614 picked,
2615 u16::MAX,
2616 "fallback returned a draining shard ({}); pushes there would \
2617 deadlock finalize_draining (#51)",
2618 picked
2619 );
2620 }
2621 }
2622
2623 /// Regression: BUG_REPORT.md #32 — `scale_up(0)` previously
2624 /// bumped the cooldown timestamp and could spuriously fail at
2625 /// `u16::MAX` even though zero ids were being allocated.
2626 #[test]
2627 fn scale_up_zero_is_a_noop() {
2628 let policy = ScalingPolicy {
2629 cooldown: Duration::from_secs(60),
2630 ..Default::default()
2631 };
2632 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2633 // Pretend we just scaled — cooldown is active.
2634 *mapper.last_scaling.write() = Some(Instant::now());
2635
2636 // scale_up(0) must not return InCooldown and must not bump
2637 // last_scaling.
2638 let before_ts = *mapper.last_scaling.read();
2639 let r = mapper.scale_up(0);
2640 assert!(
2641 r.is_ok(),
2642 "scale_up(0) should succeed as a no-op, got {r:?}"
2643 );
2644 assert_eq!(r.unwrap().len(), 0);
2645 let after_ts = *mapper.last_scaling.read();
2646 assert_eq!(before_ts, after_ts, "scale_up(0) bumped cooldown timestamp");
2647
2648 // Also: position the allocator at u16::MAX and verify a
2649 // count==0 call doesn't trip the sentinel check.
2650 mapper
2651 .next_shard_id
2652 .store(u16::MAX, AtomicOrdering::Relaxed);
2653 assert!(mapper.scale_up(0).is_ok());
2654 }
2655
2656 /// Regression: `drain_specific` must bump `last_scaling` so
2657 /// a subsequent `scale_up` is gated by the cooldown floor.
2658 /// Pre-fix `scale_down` wrote `last_scaling` but
2659 /// `drain_specific` did not, so the sequence
2660 /// `drain_specific(id) → scale_up(N)` bypassed the cooldown
2661 /// — even though both decrement `active_count` and so
2662 /// should be symmetric from the budget-math perspective.
2663 #[test]
2664 fn drain_specific_bumps_last_scaling_so_scale_up_respects_cooldown() {
2665 let policy = ScalingPolicy {
2666 min_shards: 1,
2667 max_shards: 8,
2668 // A cooldown long enough that `Instant::now()` won't
2669 // accidentally elapse it during the test, but short
2670 // enough not to slow CI.
2671 cooldown: Duration::from_secs(60),
2672 ..Default::default()
2673 };
2674 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2675
2676 // Pre-condition: no prior scaling action.
2677 assert!(mapper.last_scaling.read().is_none());
2678
2679 let before = Instant::now();
2680 mapper.drain_specific(0).unwrap();
2681 let after_ts = mapper
2682 .last_scaling
2683 .read()
2684 .expect("drain_specific must record a `last_scaling` timestamp");
2685 // Sanity: the recorded timestamp is in the test window.
2686 assert!(
2687 after_ts >= before,
2688 "last_scaling must be bumped to a current Instant (got {:?}, before was {:?})",
2689 after_ts,
2690 before
2691 );
2692
2693 // The decisive sealed property: a follow-up scale_up
2694 // must trip the cooldown gate. Pre-fix this would have
2695 // succeeded immediately because `last_scaling` was never
2696 // written by `drain_specific`.
2697 let err = mapper
2698 .scale_up(1)
2699 .expect_err("scale_up immediately after drain_specific must hit cooldown");
2700 match err {
2701 ScalingError::InCooldown => {} // expected
2702 other => panic!("expected InCooldown after drain_specific, got {:?}", other),
2703 }
2704 }
2705
2706 /// Regression: BUG_REPORT.md #48 — `drain_specific` must
2707 /// transition the shard's `MappedShard.state` to `Draining` so
2708 /// that `select_shard` stops routing to it. The previous
2709 /// `drain_shard` only flipped a metrics atomic.
2710 #[test]
2711 fn drain_specific_takes_shard_out_of_select() {
2712 let policy = ScalingPolicy {
2713 min_shards: 1,
2714 max_shards: 8,
2715 cooldown: Duration::from_nanos(1),
2716 ..Default::default()
2717 };
2718 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2719
2720 // Drain shard 0.
2721 mapper.drain_specific(0).unwrap();
2722 assert_eq!(mapper.shard_state(0), Some(ShardState::Draining));
2723 assert_eq!(mapper.active_shard_count(), 2);
2724
2725 // Across many hashes, select_shard must not pick id 0.
2726 for hash in 0u64..10_000 {
2727 let picked = mapper.select_shard(hash);
2728 assert_ne!(
2729 picked, 0,
2730 "select_shard returned a Draining shard id 0 — \
2731 producer pushes there would block finalize_draining (#48)"
2732 );
2733 }
2734 }
2735
2736 /// Regression: BUG_REPORT.md #33 — `set_draining(true)` and a
2737 /// concurrent `record_push` race on `pushes_since_drain_start`.
2738 /// The race itself can't be eliminated without a CAS loop on
2739 /// the counter (which would penalize the hot path), so the
2740 /// best we can pin is: after the dust settles, the counter
2741 /// value is bounded — never larger than the number of pushes
2742 /// that genuinely overlapped the transition. This catches a
2743 /// regression where the store-zero stops happening, where the
2744 /// flag publish stops happening, or where future code adds
2745 /// drift that compounds the race across many transitions.
2746 #[test]
2747 fn set_draining_resets_counter_under_concurrent_pushes() {
2748 use std::sync::Barrier;
2749 use std::thread;
2750
2751 const ITERATIONS: usize = 200;
2752 const PUSHERS: usize = 4;
2753
2754 for _ in 0..ITERATIONS {
2755 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
2756
2757 // Sanity: counter starts at zero.
2758 assert_eq!(collector.pushes_since_drain_start(), 0);
2759
2760 // Pre-load with a noticeable amount of "before drain"
2761 // pushes so the reset has work to do.
2762 for _ in 0..50 {
2763 collector.record_push(1);
2764 }
2765 assert_eq!(collector.pushes_since_drain_start(), 50);
2766
2767 // Race: every pusher hammers `record_push` while one
2768 // thread calls `set_draining(true)`. After the barrier,
2769 // we want to observe that the counter ends up bounded
2770 // by PUSHERS (the number of pushes that genuinely
2771 // overlapped the transition) — and crucially NOT 50+,
2772 // which is what the buggy code with a missing reset
2773 // would leave behind.
2774 let barrier = Arc::new(Barrier::new(PUSHERS + 1));
2775 let mut handles = Vec::with_capacity(PUSHERS);
2776 for _ in 0..PUSHERS {
2777 let c = Arc::clone(&collector);
2778 let b = Arc::clone(&barrier);
2779 handles.push(thread::spawn(move || {
2780 b.wait();
2781 c.record_push(1);
2782 }));
2783 }
2784
2785 barrier.wait();
2786 collector.set_draining(true);
2787 for h in handles {
2788 h.join().unwrap();
2789 }
2790
2791 // After reset + at-most-PUSHERS racing pushes, the
2792 // counter is bounded. The strong guarantee we want is
2793 // "the 50 pre-drain pushes did NOT survive the reset."
2794 // Anything <= PUSHERS is acceptable — the race may
2795 // count any subset of the racing pushes.
2796 let final_count = collector.pushes_since_drain_start();
2797 assert!(
2798 final_count <= PUSHERS as u64,
2799 "set_draining reset is broken: counter is {} after reset, \
2800 expected at most {} (#33)",
2801 final_count,
2802 PUSHERS
2803 );
2804 }
2805 }
2806
2807 /// Regression: BUG_REPORT.md #49 — `finalize_draining` must
2808 /// drop the `shards` write lock before calling `on_shard_removed`,
2809 /// so a callback that re-enters the mapper (read methods like
2810 /// `shard_state`, `active_shard_ids`, etc.) does not deadlock.
2811 #[test]
2812 fn finalize_draining_does_not_deadlock_on_callback_reentry() {
2813 use std::sync::Mutex;
2814 let policy = ScalingPolicy {
2815 min_shards: 1,
2816 max_shards: 8,
2817 cooldown: Duration::from_nanos(1),
2818 ..Default::default()
2819 };
2820 let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2821
2822 // Set a callback that re-enters the mapper. Before the fix
2823 // this acquires `shards.read()` while finalize_draining
2824 // still holds `shards.write()`, deadlocking on parking_lot's
2825 // non-recursive RwLock.
2826 type Observation = (u16, Option<ShardState>);
2827 let observed_states: Arc<Mutex<Vec<Observation>>> = Arc::new(Mutex::new(Vec::new()));
2828 {
2829 let mapper_for_cb = Arc::clone(&mapper);
2830 let observed = Arc::clone(&observed_states);
2831 mapper.set_on_shard_removed(move |id| {
2832 let st = mapper_for_cb.shard_state(id);
2833 observed.lock().unwrap().push((id, st));
2834 });
2835 }
2836
2837 // Drive a shard all the way to Stopped: drain it, mark its
2838 // metrics empty (current_len = 0), and drop drain_started
2839 // far enough back that the 100ms gate is satisfied.
2840 mapper.drain_specific(0).unwrap();
2841 {
2842 let mut shards = mapper.shards.write();
2843 let s = shards.iter_mut().find(|s| s.id == 0).unwrap();
2844 s.metrics.current_len.store(0, AtomicOrdering::Relaxed);
2845 s.metrics
2846 .pushes_since_drain_start
2847 .store(0, AtomicOrdering::Relaxed);
2848 // Backdate drain_started so the elapsed > 100ms gate trips.
2849 s.drain_started = Some(Instant::now() - Duration::from_secs(1));
2850 }
2851
2852 // The call below would deadlock with the bug present.
2853 let stopped = mapper.finalize_draining();
2854 assert_eq!(stopped, vec![0]);
2855
2856 // The callback must have run AND been able to read state
2857 // (proving the lock was released).
2858 let observed = observed_states.lock().unwrap().clone();
2859 assert_eq!(observed.len(), 1);
2860 assert_eq!(observed[0].0, 0);
2861 // State should be `Stopped` (set by finalize_draining before
2862 // the lock was dropped).
2863 assert_eq!(observed[0].1, Some(ShardState::Stopped));
2864 }
2865
2866 /// `activate` must signal whether a state transition
2867 /// actually occurred so callers can avoid double-counting.
2868 /// Pre-fix it returned `Result<(), _>`, so a caller that
2869 /// invoked `activate` twice on the same shard incremented its
2870 /// own external counter (e.g. `ShardManager::num_shards`)
2871 /// twice for one logical transition.
2872 #[test]
2873 fn activate_returns_true_on_transition_and_false_on_idempotent_call() {
2874 let policy = ScalingPolicy {
2875 min_shards: 1,
2876 max_shards: 16,
2877 cooldown: Duration::from_nanos(1),
2878 ..Default::default()
2879 };
2880 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2881
2882 // Newly-provisioned shard 2 → Active: real transition.
2883 let new_ids = mapper.scale_up_provisioning(1).unwrap();
2884 assert_eq!(new_ids, vec![2]);
2885 let first = mapper.activate(2).unwrap();
2886 assert!(
2887 first,
2888 "first activate on a Provisioning shard must return true"
2889 );
2890 assert_eq!(mapper.active_shard_count(), 3);
2891
2892 // Second activate on the same already-Active shard:
2893 // idempotent, no transition.
2894 let second = mapper.activate(2).unwrap();
2895 assert!(
2896 !second,
2897 "activate on an already-Active shard must return false; \
2898 pre-fix this returned Ok(()) and the caller couldn't tell"
2899 );
2900 // active_count must NOT have moved.
2901 assert_eq!(
2902 mapper.active_shard_count(),
2903 3,
2904 "idempotent activate must not bump active_count"
2905 );
2906 }
2907}