net/shard/mapper.rs
1//! Dynamic shard mapping and scaling.
2//!
3//! This module implements dynamic shard scaling following Approach B from
4//! DYNAMIC_SHARD_SCALING.md: increasing the number of shards and rebalancing
5//! producers across them, while maintaining SPSC (single-producer single-consumer)
6//! semantics per shard.
7//!
8//! # Core Principles
9//!
10//! - Shards never accept multiple producers
11//! - Producers get moved to new shards when load increases
12//! - Ingestion performance stays at 700M ops/sec per shard
13//! - Ordering guarantees remain intact within a shard
14//! - Total throughput scales linearly with shard count
15//!
16//! # Scaling Triggers
17//!
18//! - Ring buffer fill ratio > threshold (default 70%)
19//! - Push latency exceeds threshold (default 5ns)
20//! - Batch flush latency exceeds threshold
21//! - Session/producer count growth
22//!
23//! # Architecture
24//!
25//! ```text
26//! Producers -----+
27//! |
28//! v
29//! +----------------------------+
30//! | Dynamic Shard Mapper |
31//! +----------------------------+
32//! | | |
33//! v v v
34//! Shard 0 Shard 1 Shard 2 …
35//! ```
36
37use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering as AtomicOrdering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use arc_swap::ArcSwap;
42use parking_lot::RwLock;
43
44use crate::config::ScalingPolicy;
45
46/// Metrics for a single shard used for scaling decisions.
47#[derive(Debug, Clone)]
48pub struct ShardMetrics {
49 /// Shard identifier.
50 pub shard_id: u16,
51 /// Current fill ratio (0.0 - 1.0).
52 pub fill_ratio: f64,
53 /// Events ingested in the current window.
54 pub event_rate: u64,
55 /// Average push latency in nanoseconds.
56 pub avg_push_latency_ns: u64,
57 /// Average batch flush latency in microseconds.
58 pub avg_flush_latency_us: u64,
59 /// Whether this shard is in drain mode.
60 pub draining: bool,
61 /// Computed weight for load balancing (lower = less loaded).
62 pub weight: f64,
63 /// Last update timestamp.
64 pub last_updated: Instant,
65}
66
67impl ShardMetrics {
68 /// Create new metrics for a shard.
69 pub fn new(shard_id: u16) -> Self {
70 Self {
71 shard_id,
72 fill_ratio: 0.0,
73 event_rate: 0,
74 avg_push_latency_ns: 0,
75 avg_flush_latency_us: 0,
76 draining: false,
77 weight: 0.0,
78 last_updated: Instant::now(),
79 }
80 }
81
82 /// Compute the weight based on current metrics.
83 /// Lower weight = better candidate for new producers.
84 pub fn compute_weight(&mut self) {
85 // Weight formula: combines fill ratio, latency, and event rate
86 // Higher fill ratio = higher weight (avoid overloaded shards)
87 // Higher latency = higher weight
88 // Higher event rate = higher weight
89 let fill_weight = self.fill_ratio * 100.0;
90 let latency_weight = (self.avg_push_latency_ns as f64) / 10.0;
91 let rate_weight = (self.event_rate as f64) / 1_000_000.0;
92
93 self.weight = fill_weight + latency_weight + rate_weight;
94
95 // Draining shards get maximum weight (never assign new producers)
96 if self.draining {
97 self.weight = f64::MAX;
98 }
99 }
100}
101
102/// Live metrics collector for a shard (atomics for lock-free updates).
103#[derive(Debug)]
104pub struct ShardMetricsCollector {
105 /// Shard identifier.
106 shard_id: u16,
107 /// Ring buffer capacity.
108 capacity: usize,
109 /// Current buffer length (updated by shard).
110 current_len: AtomicU64,
111 /// Events ingested in current window.
112 events_in_window: AtomicU64,
113 /// Packed `(count << 32) | sum` for push latencies (ns).
114 /// Pre-fix `push_latency_sum_ns` and `push_count` were
115 /// independent `AtomicU64`s. `record_push` did two separate
116 /// `fetch_add`s, and `collect_and_reset` did two separate
117 /// `swap`s. A metrics tick interleaving between the two
118 /// `fetch_add`s captured the sum WITHOUT the count (or
119 /// vice versa); the resulting `avg = sum.checked_div(count)
120 /// .unwrap_or(0)` returned 0 in window N (sum without
121 /// count) and 0 in window N+1 (count without sum), silently
122 /// zeroing the average that drives `evaluate_scaling`'s
123 /// push-latency scale-up trigger. Packing into one u64 makes
124 /// the `(sum, count)` update atomic; the upper 32 bits hold
125 /// the count (u32::MAX = 4G calls/window — plenty) and the
126 /// lower 32 hold the sum (u32::MAX = 4 G ns ≈ 4 s, also
127 /// plenty for any sane window).
128 push_latency: AtomicU64,
129 /// Packed `(count << 32) | sum` for flush latencies (us).
130 /// Same shape and rationale as `push_latency`. The lower 32
131 /// bits hold sum-µs (u32::MAX ≈ 4 Gµs ≈ 67 minutes — far
132 /// past any plausible window).
133 flush_latency: AtomicU64,
134 /// Whether this shard is draining.
135 draining: AtomicBool,
136 /// Window start time.
137 window_start: RwLock<Instant>,
138 /// Pushes observed since `set_draining(true)` was last called.
139 /// Distinct from `events_in_window` because this counter is NOT
140 /// reset by `collect_and_reset`. `finalize_draining` reads this
141 /// instead of `events_in_window` so a drain-window-overlap with
142 /// a metrics tick can no longer race the counter to zero before
143 /// the producer is observed.
144 pushes_since_drain_start: AtomicU64,
145}
146
147impl ShardMetricsCollector {
148 /// Create a new metrics collector.
149 pub fn new(shard_id: u16, capacity: usize) -> Self {
150 Self {
151 shard_id,
152 capacity,
153 current_len: AtomicU64::new(0),
154 events_in_window: AtomicU64::new(0),
155 push_latency: AtomicU64::new(0),
156 flush_latency: AtomicU64::new(0),
157 draining: AtomicBool::new(false),
158 window_start: RwLock::new(Instant::now()),
159 pushes_since_drain_start: AtomicU64::new(0),
160 }
161 }
162
163 /// Record current buffer length.
164 #[inline]
165 pub fn record_buffer_len(&self, len: usize) {
166 self.current_len.store(len as u64, AtomicOrdering::Relaxed);
167 }
168
169 /// Record an event ingestion.
170 #[inline]
171 pub fn record_push(&self, latency_ns: u64) {
172 self.events_in_window.fetch_add(1, AtomicOrdering::Relaxed);
173 // Atomically add 1 to count (upper 32 bits) and
174 // `latency_ns` to sum (lower 32 bits). `fetch_update`
175 // CAS-loops the load-and-store, so a concurrent
176 // `collect_and_reset` swap on the same word either sees
177 // both pre-add or both post-add — no `(sum, count)`
178 // desync. Saturating ops cap at u32::MAX inside the
179 // pack window (~4 G calls / 4 s of accumulated latency),
180 // which is far beyond any sane metrics tick.
181 let _ =
182 self.push_latency
183 .fetch_update(AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |v| {
184 let count = (v >> 32) as u32;
185 let sum = (v & 0xFFFF_FFFF) as u32;
186 let new_count = count.saturating_add(1) as u64;
187 let new_sum = sum.saturating_add(latency_ns.min(u32::MAX as u64) as u32) as u64;
188 Some((new_count << 32) | new_sum)
189 });
190 // Always increment — the cost is one fetch_add and the
191 // counter only matters when the shard is draining. Cheaper
192 // than branching on `self.draining.load()` in the hot path.
193 self.pushes_since_drain_start
194 .fetch_add(1, AtomicOrdering::Relaxed);
195 }
196
197 /// Record a batch flush.
198 #[inline]
199 pub fn record_flush(&self, latency_us: u64) {
200 // Same packed-`(count, sum)` shape as `record_push` —
201 // see that function for the desync rationale.
202 let _ = self.flush_latency.fetch_update(
203 AtomicOrdering::Relaxed,
204 AtomicOrdering::Relaxed,
205 |v| {
206 let count = (v >> 32) as u32;
207 let sum = (v & 0xFFFF_FFFF) as u32;
208 let new_count = count.saturating_add(1) as u64;
209 let new_sum = sum.saturating_add(latency_us.min(u32::MAX as u64) as u32) as u64;
210 Some((new_count << 32) | new_sum)
211 },
212 );
213 }
214
215 /// Set drain mode.
216 ///
217 /// Transitions to draining reset `pushes_since_drain_start` so
218 /// `finalize_draining` only counts pushes that arrived after the
219 /// drain began.
220 ///
221 /// A concurrent `record_push` can interleave with the store-zero
222 /// on `pushes_since_drain_start` and leave the counter at `1`
223 /// rather than `0`. That's not a correctness bug — it just defers
224 /// finalization of this shard by one metrics tick — but the
225 /// previous code did the store-zero on the counter *before*
226 /// publishing the `draining=true` flag, which made the window
227 /// slightly larger than necessary. Publishing the flag first
228 /// means any push that *observes* `draining=true` is naturally
229 /// sequenced after the reset; pushes that beat the flag publish
230 /// race the reset just like before.
231 ///
232 /// We also use `SeqCst` for the publish to give the rest of the
233 /// crate a single total order on draining transitions, which
234 /// matches the ordering on `try_enter_ingest`'s shutdown flag.
235 pub fn set_draining(&self, draining: bool) {
236 if draining {
237 // Store-zero first (so the flag publish below acts as
238 // the release-fence for both writes).
239 self.pushes_since_drain_start
240 .store(0, AtomicOrdering::SeqCst);
241 }
242 self.draining.store(draining, AtomicOrdering::SeqCst);
243 }
244
245 /// Number of pushes observed since `set_draining(true)` was
246 /// last called. Used by `finalize_draining` to detect lingering
247 /// producers that the window-reset `events_in_window` counter
248 /// can race past.
249 ///
250 /// Pre-fix used `Ordering::Relaxed`, but the writer
251 /// side (`set_draining(true)`) resets the counter under
252 /// `SeqCst`. On weakly-ordered hardware (ARM), a Relaxed
253 /// reader could observe a stale counter and `finalize_draining`
254 /// would falsely conclude the drain had flushed while
255 /// producers were still pushing. Acquire pairs with the
256 /// SeqCst release of the reset (SeqCst includes Release
257 /// semantics), making the reset happen-before this load.
258 pub fn pushes_since_drain_start(&self) -> u64 {
259 self.pushes_since_drain_start.load(AtomicOrdering::Acquire)
260 }
261
262 /// Check if draining.
263 pub fn is_draining(&self) -> bool {
264 self.draining.load(AtomicOrdering::Acquire)
265 }
266
267 /// Collect metrics and reset window counters.
268 ///
269 /// NOTE: The individual atomic swaps below are not collectively atomic with
270 /// respect to concurrent `record_push`/`record_flush` calls. This means a
271 /// push recorded between, say, the `events_in_window` swap and the
272 /// `push_count` swap could be counted in one counter but not the other for
273 /// a given window. This small inaccuracy is an accepted trade-off to
274 /// preserve the lock-free design of the hot path (`record_push` /
275 /// `record_flush`). Adding a `Mutex` here would serialize the hot path
276 /// and defeat the purpose of using atomics.
277 pub fn collect_and_reset(&self) -> ShardMetrics {
278 let current_len = self.current_len.load(AtomicOrdering::Relaxed);
279 let events = self.events_in_window.swap(0, AtomicOrdering::Relaxed);
280 // Single swap captures `(count, sum)` together; no
281 // chance of catching the sum without the matching count
282 // (or vice versa) the way two independent swaps did.
283 let push_packed = self.push_latency.swap(0, AtomicOrdering::Relaxed);
284 let push_count = push_packed >> 32;
285 let push_latency_sum = push_packed & 0xFFFF_FFFF;
286 let flush_packed = self.flush_latency.swap(0, AtomicOrdering::Relaxed);
287 let flush_count = flush_packed >> 32;
288 let flush_latency_sum = flush_packed & 0xFFFF_FFFF;
289
290 let fill_ratio = if self.capacity > 0 {
291 current_len as f64 / self.capacity as f64
292 } else {
293 0.0
294 };
295
296 let avg_push_latency = push_latency_sum.checked_div(push_count).unwrap_or(0);
297
298 let avg_flush_latency = flush_latency_sum.checked_div(flush_count).unwrap_or(0);
299
300 // Reset window
301 *self.window_start.write() = Instant::now();
302
303 let mut metrics = ShardMetrics {
304 shard_id: self.shard_id,
305 fill_ratio,
306 event_rate: events,
307 avg_push_latency_ns: avg_push_latency,
308 avg_flush_latency_us: avg_flush_latency,
309 draining: self.draining.load(AtomicOrdering::Acquire),
310 weight: 0.0,
311 last_updated: Instant::now(),
312 };
313 metrics.compute_weight();
314 metrics
315 }
316}
317
318/// Scaling decision made by the mapper.
319#[derive(Debug, Clone, PartialEq, Eq)]
320pub enum ScalingDecision {
321 /// No scaling needed.
322 None,
323 /// Scale up by adding N shards.
324 ScaleUp(u16),
325 /// Scale down by removing N shards (marks them for draining).
326 ScaleDown(u16),
327}
328
329/// Errors that can occur during scaling operations.
330#[derive(Debug, Clone)]
331pub enum ScalingError {
332 /// Invalid scaling policy.
333 InvalidPolicy(String),
334 /// Already at maximum shards.
335 AtMaxShards,
336 /// Already at minimum shards.
337 AtMinShards,
338 /// Scaling operation in cooldown.
339 InCooldown,
340 /// Shard creation failed.
341 ShardCreationFailed(String),
342}
343
344impl std::fmt::Display for ScalingError {
345 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
346 match self {
347 Self::InvalidPolicy(msg) => write!(f, "invalid scaling policy: {}", msg),
348 Self::AtMaxShards => write!(f, "already at maximum shard count"),
349 Self::AtMinShards => write!(f, "already at minimum shard count"),
350 Self::InCooldown => write!(f, "scaling operation in cooldown"),
351 Self::ShardCreationFailed(msg) => write!(f, "shard creation failed: {}", msg),
352 }
353 }
354}
355
356impl std::error::Error for ScalingError {}
357
358/// State of a shard in the mapper.
359#[derive(Debug, Clone, Copy, PartialEq, Eq)]
360pub enum ShardState {
361 /// Shard is being provisioned: id allocated and metrics collector
362 /// in place, but `select_shard` must not route to it yet because
363 /// upstream workers (drain / batch) have not been spawned. Caller
364 /// transitions to `Active` via `activate` once the workers are
365 /// ready. Closes the race where a freshly-added shard accepted
366 /// producer pushes before any consumer existed.
367 Provisioning,
368 /// Shard is active and accepting producers.
369 Active,
370 /// Shard is draining (no new producers, waiting for empty).
371 Draining,
372 /// Shard is stopped and can be removed.
373 Stopped,
374}
375
376/// Information about a shard managed by the mapper.
377#[derive(Debug)]
378struct MappedShard {
379 /// Shard ID.
380 id: u16,
381 /// Current state.
382 state: ShardState,
383 /// Metrics collector.
384 metrics: Arc<ShardMetricsCollector>,
385 /// When this shard entered drain mode (if draining).
386 drain_started: Option<Instant>,
387 /// Last collected metrics.
388 last_metrics: ShardMetrics,
389 /// When this shard last transitioned to `Active`. Used by
390 /// `evaluate_scaling` to skip recently-activated shards from
391 /// scale decisions: their `last_metrics` is the
392 /// `ShardMetrics::new(id)` placeholder until at least one
393 /// `collect_metrics` cycle has run, and the placeholder
394 /// (`fill_ratio = 0.0, event_rate = 0`) trips the
395 /// underutilized trigger immediately — oscillating the system
396 /// (scale-up → next tick scale-down → next tick scale-up …)
397 /// when a fresh shard is added but hasn't yet absorbed any
398 /// traffic.
399 activated_at: Instant,
400}
401
402/// Callback type for shard lifecycle events.
403type ShardCallback = Box<dyn Fn(u16) + Send + Sync>;
404
405/// Pre-computed slice of shard ids that `select_shard` may return.
406///
407/// Holds the subset of currently-Active shards whose `last_metrics.weight`
408/// is within tolerance of the minimum. Empty when no active shards exist
409/// (in which case `select_shard` returns `u16::MAX`).
410///
411/// The table is rebuilt by [`ShardMapper::rebuild_selection_table_locked`]
412/// at every mutation that could change which shards are routable (state
413/// transitions, metric/weight refresh, scale-up/down). Hot-path readers
414/// (`select_shard`) snapshot it via `ArcSwap::load` — zero alloc, zero
415/// lock per call.
416struct SelectionTable {
417 /// Shard ids in priority order (within-tolerance of min weight).
418 /// `Box<[u16]>` rather than `Vec<u16>` because the slice is immutable
419 /// after construction; saves one word per snapshot.
420 candidates: Box<[u16]>,
421}
422
423/// Dynamic shard mapper that manages shard allocation and producer routing.
424///
425/// This is the core component for dynamic scaling. It:
426/// - Tracks metrics for all shards
427/// - Makes scaling decisions based on policy
428/// - Routes producers to the least-loaded shards
429/// - Manages shard lifecycle (active → draining → stopped)
430pub struct ShardMapper {
431 /// Mapped shards (RwLock for concurrent reads, rare writes).
432 shards: RwLock<Vec<MappedShard>>,
433 /// Pre-computed routable-shard snapshot. Updated whenever the shard
434 /// list or weights change; read lock-free by [`Self::select_shard`].
435 ///
436 /// Pre-fix [perf #2 in `docs/performance/net-perf-analysis.md`],
437 /// `select_shard` did this work on every event: two `Vec` allocs +
438 /// an `RwLock::read` + min-weight scan + tolerance filter. At 10M
439 /// ev/s that's 20M allocs/sec plus a parking_lot acquire each time.
440 /// The table is rebuilt at most once per metrics tick and once per
441 /// state transition.
442 selection_table: ArcSwap<SelectionTable>,
443 /// Current active shard count.
444 active_count: AtomicU16,
445 /// Scaling policy.
446 ///
447 /// This field is immutable for the lifetime of the mapper. The
448 /// previous `set_policy(&mut self, …)` API was unreachable in
449 /// practice — every production callsite holds the mapper behind
450 /// an `Arc`, and `Arc::get_mut` requires a strong count of 1,
451 /// which never holds once the worker pool clones the `Arc`. The
452 /// method has been removed; recreate the mapper (and
453 /// the bus that owns it) to change the policy.
454 policy: ScalingPolicy,
455 /// Ring buffer capacity for new shards.
456 ring_buffer_capacity: usize,
457 /// Last scaling operation timestamp.
458 ///
459 /// This RwLock is **logically** scoped to the
460 /// outer `shards.write()` lock — `scale_up`, `scale_down`,
461 /// and `scale_up_provisioning` all read this field and write
462 /// to it while holding `shards.write()`. The cooldown gate
463 /// is therefore atomic with the scale mutation: no caller
464 /// can pass the cooldown check, observe stale `last_scaling`,
465 /// and have its mutation interleave with another caller.
466 ///
467 /// **If you narrow `shards.write()`'s scope in any of those
468 /// callers, this implicit serialization breaks.** Either:
469 /// - Keep the cooldown read+write inside the outer lock, OR
470 /// - Use a `compare_exchange`-style update on a single
471 /// `AtomicI64` of nanos so the gate is atomic on its own.
472 ///
473 /// The doc-comment is here so a future refactorer doesn't
474 /// silently break the contract.
475 last_scaling: RwLock<Option<Instant>>,
476 /// Callback for shard creation (provided by ShardManager).
477 on_shard_created: RwLock<Option<ShardCallback>>,
478 /// Callback for shard removal (provided by ShardManager).
479 on_shard_removed: RwLock<Option<ShardCallback>>,
480 /// Monotonic shard-id allocator. The next `scale_up` call gets
481 /// `fetch_add(1)` from here. Distinct from `shards.iter().max() +
482 /// 1`: that approach reused ids whenever the highest-numbered
483 /// shard had been drained-and-removed, silently merging two
484 /// unrelated shard lifetimes in any external system that keys
485 /// metrics or checkpoints on shard id. Monotonic allocation
486 /// ensures every shard ever allocated has a globally unique id
487 /// for the lifetime of this mapper.
488 next_shard_id: AtomicU16,
489}
490
491impl ShardMapper {
492 /// Create a new shard mapper with the given initial shard count and policy.
493 pub fn new(
494 initial_shards: u16,
495 ring_buffer_capacity: usize,
496 policy: ScalingPolicy,
497 ) -> Result<Self, ScalingError> {
498 let mut policy = policy.normalize();
499 // Ensure max_shards can accommodate the initial shard count
500 if policy.max_shards < initial_shards {
501 policy.max_shards = initial_shards;
502 }
503 policy
504 .validate()
505 .map_err(|e| ScalingError::InvalidPolicy(e.to_string()))?;
506
507 // Initial shards: stamp `activated_at` far enough in the
508 // past that they're not subject to the warmup skip in
509 // `evaluate_scaling`. The boot-time shards have whatever
510 // baseline traffic the system serves; they shouldn't be
511 // exempted from scale decisions just because the mapper
512 // was just constructed.
513 let boot = Instant::now()
514 .checked_sub(std::time::Duration::from_secs(3600))
515 .unwrap_or_else(Instant::now);
516 let shards: Vec<MappedShard> = (0..initial_shards)
517 .map(|id| MappedShard {
518 id,
519 state: ShardState::Active,
520 metrics: Arc::new(ShardMetricsCollector::new(id, ring_buffer_capacity)),
521 drain_started: None,
522 last_metrics: ShardMetrics::new(id),
523 activated_at: boot,
524 })
525 .collect();
526
527 let mapper = Self {
528 shards: RwLock::new(shards),
529 selection_table: ArcSwap::from_pointee(SelectionTable {
530 candidates: Box::new([]),
531 }),
532 active_count: AtomicU16::new(initial_shards),
533 policy,
534 ring_buffer_capacity,
535 last_scaling: RwLock::new(None),
536 on_shard_created: RwLock::new(None),
537 on_shard_removed: RwLock::new(None),
538 // Initial shards occupy ids `[0, initial_shards)`, so the
539 // first scale-up takes `initial_shards`.
540 next_shard_id: AtomicU16::new(initial_shards),
541 };
542 // Seed the selection table with the boot shards.
543 mapper.rebuild_selection_table_locked(&mapper.shards.read());
544 Ok(mapper)
545 }
546
547 /// Rebuild the [`Self::selection_table`] from the current shards
548 /// slice. Must be called while holding the `shards` lock (read or
549 /// write) so the snapshot it captures is consistent with the
550 /// state visible to other callers.
551 ///
552 /// Filters to `Active` shards, finds the minimum weight, and
553 /// retains every active shard whose weight is within 0.1 of the
554 /// minimum. The tolerance pre-existed the perf fix (see the old
555 /// `select_shard` body) so the new table preserves the same
556 /// routing semantics.
557 ///
558 /// NaN-tolerance edge case: if every active shard has a NaN
559 /// weight the tolerance filter excludes all of them. To preserve
560 /// the pre-fix fallback (`active[0].id`), the table falls back
561 /// to *every* active id in that case.
562 fn rebuild_selection_table_locked(&self, shards: &[MappedShard]) {
563 let mut active_ids = Vec::with_capacity(shards.len());
564 for s in shards.iter() {
565 if s.state == ShardState::Active {
566 active_ids.push(s.id);
567 }
568 }
569 if active_ids.is_empty() {
570 self.selection_table.store(Arc::new(SelectionTable {
571 candidates: Box::new([]),
572 }));
573 return;
574 }
575 let min_weight = shards
576 .iter()
577 .filter(|s| s.state == ShardState::Active)
578 .map(|s| s.last_metrics.weight)
579 .fold(f64::MAX, f64::min);
580 let mut candidates: Vec<u16> = shards
581 .iter()
582 .filter(|s| {
583 s.state == ShardState::Active && (s.last_metrics.weight - min_weight).abs() < 0.1
584 })
585 .map(|s| s.id)
586 .collect();
587 if candidates.is_empty() {
588 // All-NaN fallback. Preserves the pre-fix `active[0].id`
589 // safety net; the random pick across all active shards
590 // is a strict superset of "first active."
591 candidates = active_ids;
592 }
593 self.selection_table.store(Arc::new(SelectionTable {
594 candidates: candidates.into_boxed_slice(),
595 }));
596 }
597
598 /// Set callback for shard creation.
599 pub fn set_on_shard_created<F>(&self, callback: F)
600 where
601 F: Fn(u16) + Send + Sync + 'static,
602 {
603 *self.on_shard_created.write() = Some(Box::new(callback));
604 }
605
606 /// Set callback for shard removal.
607 pub fn set_on_shard_removed<F>(&self, callback: F)
608 where
609 F: Fn(u16) + Send + Sync + 'static,
610 {
611 *self.on_shard_removed.write() = Some(Box::new(callback));
612 }
613
614 /// Get the metrics collector for a shard.
615 pub fn metrics_collector(&self, shard_id: u16) -> Option<Arc<ShardMetricsCollector>> {
616 let shards = self.shards.read();
617 shards
618 .iter()
619 .find(|s| s.id == shard_id)
620 .map(|s| s.metrics.clone())
621 }
622
623 /// Get current active shard count.
624 pub fn active_shard_count(&self) -> u16 {
625 self.active_count.load(AtomicOrdering::Acquire)
626 }
627
628 /// Get total shard count (including draining).
629 pub fn total_shard_count(&self) -> u16 {
630 self.shards.read().len() as u16
631 }
632
633 /// Select the best shard for a new event/producer.
634 ///
635 /// Pre-fix this acquired `shards.read()` and allocated two `Vec`s
636 /// per call (active filter, candidate filter) — at 10M ev/s that
637 /// was 20M allocs/sec plus a parking_lot acquire per event.
638 /// Post-fix the routable subset is pre-computed in the private
639 /// `selection_table` field and updated only at state
640 /// transitions / metric refreshes; the hot path is a single
641 /// `ArcSwap::load` + Lemire-bias-free integer mapping.
642 ///
643 /// Semantics preserved:
644 /// - Only considers active (non-draining) shards
645 /// - Prefers shards with lower weight (less loaded)
646 /// - Falls back to a random pick across all active shards if the
647 /// tolerance filter excludes everything (NaN-weight edge case)
648 /// - Returns `u16::MAX` when no active shard exists, so callers
649 /// that route via the routing table get a definite miss
650 #[inline]
651 pub fn select_shard(&self, event_hash: u64) -> u16 {
652 let table = self.selection_table.load();
653 if table.candidates.is_empty() {
654 // No active shards. Pre-fix returned `u16::MAX` from the
655 // same arm — callers that look up the id in the routing
656 // table get a definite miss, which the manager already
657 // accounts for.
658 return u16::MAX;
659 }
660 // Lemire's bias-free index mapping for any `len` that fits
661 // in u64. Pre-fix used `(event_hash as usize) %
662 // candidates.len()`, which biases low-bucket indices when
663 // `candidates.len()` is not a power of two
664 // (https://lemire.me/blog/2016/06/30/fast-random-shuffling/).
665 let idx = ((event_hash as u128 * table.candidates.len() as u128) >> 64) as usize;
666 table.candidates[idx]
667 }
668
669 /// Collect metrics from all shards and update weights.
670 pub fn collect_metrics(&self) -> Vec<ShardMetrics> {
671 let mut shards = self.shards.write();
672 let result: Vec<ShardMetrics> = shards
673 .iter_mut()
674 .map(|s| {
675 s.last_metrics = s.metrics.collect_and_reset();
676 s.last_metrics.clone()
677 })
678 .collect();
679 // Weights just changed — refresh the selection table so
680 // `select_shard` observes the new min-weight candidate set
681 // without re-scanning on every event.
682 self.rebuild_selection_table_locked(&shards);
683 result
684 }
685
686 /// Evaluate scaling based on current metrics.
687 ///
688 /// Returns a scaling decision without executing it.
689 pub fn evaluate_scaling(&self) -> ScalingDecision {
690 if !self.policy.auto_scale {
691 return ScalingDecision::None;
692 }
693
694 // Check cooldown
695 if let Some(last) = *self.last_scaling.read() {
696 if last.elapsed() < self.policy.cooldown {
697 return ScalingDecision::None;
698 }
699 }
700
701 let shards = self.shards.read();
702 let active_count = self.active_count.load(AtomicOrdering::Acquire);
703
704 // Check for scale-up triggers
705 let mut overloaded_count = 0;
706 let mut underutilized_count = 0;
707
708 // Warmup window for freshly-activated shards. A just-
709 // activated shard's `last_metrics` is the
710 // `ShardMetrics::new(id)` placeholder
711 // (`fill_ratio = 0.0, event_rate = 0`) until at least one
712 // `collect_metrics` cycle has run. The placeholder
713 // immediately matches the underutilized trigger
714 // (`fill_ratio < underutilized_threshold && event_rate
715 // == 0`), so a fresh shard added by scale-up would
716 // immediately count as underutilized on the next
717 // `evaluate_scaling` and trigger scale-down — oscillating
718 // the system. Reuse `policy.cooldown` as the warmup
719 // window: it's already the minimum gap between scaling
720 // actions, and a shard collected at least once within
721 // that window has accumulated real metrics.
722 let now = Instant::now();
723 let warmup = self.policy.cooldown;
724
725 for shard in shards.iter() {
726 if shard.state != ShardState::Active {
727 continue;
728 }
729
730 // Skip the placeholder-metrics window for freshly-
731 // activated shards. They count toward `active_count`
732 // (so the budget math stays consistent) but don't
733 // tip the overload/underutilized tallies.
734 if now.duration_since(shard.activated_at) < warmup {
735 continue;
736 }
737
738 let m = &shard.last_metrics;
739
740 // Scale-up triggers
741 if m.fill_ratio > self.policy.fill_ratio_threshold
742 || m.avg_push_latency_ns > self.policy.push_latency_threshold_ns
743 || m.avg_flush_latency_us > self.policy.flush_latency_threshold_us
744 {
745 overloaded_count += 1;
746 }
747
748 // Scale-down triggers
749 if m.fill_ratio < self.policy.underutilized_threshold && m.event_rate == 0 {
750 underutilized_count += 1;
751 }
752 }
753
754 // Scale up if more than half of shards are overloaded
755 if overloaded_count > active_count / 2 && active_count < self.policy.max_shards {
756 // Add shards proportional to overload
757 let shards_to_add = (overloaded_count / 2)
758 .max(1)
759 .min(self.policy.max_shards - active_count);
760 return ScalingDecision::ScaleUp(shards_to_add);
761 }
762
763 // Scale down if more than half of shards are underutilized
764 // and we're above minimum
765 if underutilized_count > active_count / 2 && active_count > self.policy.min_shards {
766 let shards_to_remove = (underutilized_count / 2)
767 .max(1)
768 .min(active_count - self.policy.min_shards);
769 return ScalingDecision::ScaleDown(shards_to_remove);
770 }
771
772 ScalingDecision::None
773 }
774
775 /// Validate cooldown + max-shards budget for an `add count`
776 /// scale-up request. Cheap pre-check that doesn't take the
777 /// write lock — callers re-check under the lock.
778 fn check_scale_up_budget(&self, count: u16) -> Result<(), ScalingError> {
779 let current = self.active_count.load(AtomicOrdering::Acquire);
780 let would_be = current
781 .checked_add(count)
782 .ok_or(ScalingError::AtMaxShards)?;
783 if would_be > self.policy.max_shards {
784 return Err(ScalingError::AtMaxShards);
785 }
786 let last = self.last_scaling.read();
787 if let Some(ts) = *last {
788 if ts.elapsed() < self.policy.cooldown {
789 return Err(ScalingError::InCooldown);
790 }
791 }
792 Ok(())
793 }
794
795 /// Allocate `count` shard ids and push their `MappedShard`
796 /// records into `shards` with the supplied `state`. The caller
797 /// already holds `self.shards.write()` and is responsible for
798 /// dropping it before notifying callbacks. Returns the allocated
799 /// ids in order.
800 ///
801 /// Performs the budget + cooldown re-check under the write lock,
802 /// the next_shard_id allocation, and the per-shard push. Does NOT
803 /// touch `active_count` — `scale_up` bumps it for `Active` shards;
804 /// `Provisioning` shards bump it later when `activate` fires.
805 fn allocate_shards_inner(
806 &self,
807 count: u16,
808 state: ShardState,
809 shards: &mut Vec<MappedShard>,
810 ) -> Result<Vec<u16>, ScalingError> {
811 self.allocate_shards_inner_with_policy(count, state, shards, false)
812 }
813
814 fn allocate_shards_inner_with_policy(
815 &self,
816 count: u16,
817 state: ShardState,
818 shards: &mut Vec<MappedShard>,
819 force: bool,
820 ) -> Result<Vec<u16>, ScalingError> {
821 // Re-check budget under the write lock — two concurrent
822 // scale-up callers could both pass the read-locked early
823 // check, both serialize through `shards.write()`, and both
824 // succeed without this re-check.
825 if force {
826 // Budget only — skip cooldown for operator-initiated paths.
827 let current = self.active_count.load(AtomicOrdering::Acquire);
828 let would_be = current
829 .checked_add(count)
830 .ok_or(ScalingError::AtMaxShards)?;
831 if would_be > self.policy.max_shards {
832 return Err(ScalingError::AtMaxShards);
833 }
834 } else {
835 self.check_scale_up_budget(count)?;
836 }
837
838 let first_id = self.next_shard_id.load(AtomicOrdering::Relaxed);
839 let last_needed = first_id
840 .checked_add(count.saturating_sub(1))
841 .ok_or(ScalingError::AtMaxShards)?;
842 // Reserve `u16::MAX` as a sentinel so the post-allocation
843 // store cannot wrap.
844 if last_needed == u16::MAX {
845 return Err(ScalingError::AtMaxShards);
846 }
847 // `first_id + count == last_needed + 1`. We already
848 // refused `last_needed == u16::MAX` above, so the sum is
849 // provably <= u16::MAX. Use `checked_add` anyway as a
850 // belt-and-suspenders guard: a future change that
851 // weakens the sentinel check would otherwise reach an
852 // unchecked u16 wrap here, silently rolling
853 // `next_shard_id` back to 0 and then re-issuing already-
854 // allocated ids.
855 let next_id_after = first_id
856 .checked_add(count)
857 .ok_or(ScalingError::AtMaxShards)?;
858 self.next_shard_id
859 .store(next_id_after, AtomicOrdering::Relaxed);
860
861 let mut new_ids = Vec::with_capacity(count as usize);
862 let now = Instant::now();
863 for i in 0..count {
864 let new_id = first_id + i;
865 shards.push(MappedShard {
866 id: new_id,
867 state,
868 metrics: Arc::new(ShardMetricsCollector::new(
869 new_id,
870 self.ring_buffer_capacity,
871 )),
872 drain_started: None,
873 last_metrics: ShardMetrics::new(new_id),
874 // Stamp the activation moment so `evaluate_scaling`
875 // can skip this shard until at least one collect
876 // cycle has run. Prevents the placeholder
877 // (`fill_ratio = 0, event_rate = 0`) from
878 // immediately tripping the underutilized trigger
879 // and oscillating the system.
880 activated_at: now,
881 });
882 new_ids.push(new_id);
883 }
884 Ok(new_ids)
885 }
886
887 /// Execute a scale-up operation.
888 ///
889 /// Creates new shards in the `Active` state and makes them
890 /// immediately available for routing. Use [`scale_up_provisioning`]
891 /// if upstream workers (drain / batch) need to be wired up before
892 /// the shard becomes selectable — otherwise producer pushes can
893 /// race ahead of consumer creation.
894 ///
895 /// [`scale_up_provisioning`]: Self::scale_up_provisioning
896 pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
897 // Short-circuit `count == 0` so a no-op call doesn't bump the
898 // cooldown timestamp or trip the `u16::MAX` sentinel check
899 // (which previously fired spuriously when
900 // `first_id == u16::MAX` even though zero ids were being
901 // allocated).
902 if count == 0 {
903 return Ok(Vec::new());
904 }
905
906 self.check_scale_up_budget(count)?;
907
908 let mut shards = self.shards.write();
909 let new_ids = self.allocate_shards_inner(count, ShardState::Active, &mut shards)?;
910
911 // Update counts. `fetch_add` cannot wrap here — the
912 // `check_scale_up_budget` gate above ensures `current + count <=
913 // max_shards <= u16::MAX` — but keep the ordering explicit
914 // so the next contender's cooldown re-check sees the fresh
915 // timestamp.
916 self.active_count.fetch_add(count, AtomicOrdering::Release);
917 *self.last_scaling.write() = Some(Instant::now());
918
919 // New active shards are routable immediately. Refresh the
920 // selection table before dropping the lock so any concurrent
921 // `select_shard` observes the new ids in its very next call.
922 self.rebuild_selection_table_locked(&shards);
923
924 // Drop the write lock before notifying callbacks — they
925 // are user-supplied and may take arbitrary time.
926 drop(shards);
927
928 if let Some(callback) = self.on_shard_created.read().as_ref() {
929 for &id in &new_ids {
930 callback(id);
931 }
932 }
933
934 Ok(new_ids)
935 }
936
937 /// Like [`scale_up`], but the new shards are created in the
938 /// `Provisioning` state. They receive an id and a metrics
939 /// collector, but `select_shard` will not route to them and they
940 /// are excluded from `active_shard_count` / `evaluate_scaling`
941 /// until the caller transitions each shard with [`activate`].
942 ///
943 /// Use this when upstream consumer infrastructure (drain/batch
944 /// workers, mpsc channels, etc.) must be wired up *before* the
945 /// shard becomes selectable. Without this gating, producers can
946 /// observe the shard via `select_shard`, push into its ring
947 /// buffer, and never have those events drained.
948 ///
949 /// Returns the allocated ids in order. Cooldown / `max_shards`
950 /// gating matches `scale_up` so that staged allocation cannot be
951 /// used to bypass the policy.
952 ///
953 /// [`scale_up`]: Self::scale_up
954 /// [`activate`]: Self::activate
955 pub fn scale_up_provisioning(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
956 if count == 0 {
957 return Ok(Vec::new());
958 }
959
960 self.check_scale_up_budget(count)?;
961
962 let mut shards = self.shards.write();
963 let new_ids = self.allocate_shards_inner(count, ShardState::Provisioning, &mut shards)?;
964
965 // Bump cooldown but NOT active_count — these shards are
966 // not active yet. `activate` bumps active_count when each
967 // becomes selectable.
968 *self.last_scaling.write() = Some(Instant::now());
969 drop(shards);
970 // Intentionally do NOT fire `on_shard_created` here — the
971 // callback signals "shard is live"; provisioning shards are
972 // not. `activate` fires it instead.
973 Ok(new_ids)
974 }
975
976 /// Allocate `count` Provisioning shards, bypassing the cooldown gate.
977 ///
978 /// Used by operator-initiated `manual_scale_up` paths. The
979 /// cooldown exists to prevent the auto-scaling monitor from
980 /// scaling-up too aggressively in response to transient
981 /// load spikes; a manual call from an operator is a
982 /// deliberate request that should not be rate-limited by
983 /// the auto-scaling cadence. The budget check (against
984 /// `max_shards`) still applies.
985 ///
986 /// Pre-fix `manual_scale_up(N)` looped `add_shard()` N
987 /// times, each call invoking `scale_up_provisioning(1)`
988 /// which bumped `last_scaling`. The second call then
989 /// immediately failed with `InCooldown` (default 30s
990 /// cooldown), leaving the first shard half-added and
991 /// returning an error to the operator with no rollback.
992 pub fn scale_up_provisioning_force(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
993 if count == 0 {
994 return Ok(Vec::new());
995 }
996
997 let mut shards = self.shards.write();
998 let new_ids = self.allocate_shards_inner_with_policy(
999 count,
1000 ShardState::Provisioning,
1001 &mut shards,
1002 true, // skip cooldown
1003 )?;
1004
1005 // Still bump the cooldown timestamp so the next
1006 // *auto-scaling* tick respects the cooldown floor.
1007 *self.last_scaling.write() = Some(Instant::now());
1008 drop(shards);
1009 Ok(new_ids)
1010 }
1011
1012 /// Transition a `Provisioning` shard to `Active`.
1013 ///
1014 /// Returns `Ok(true)` if a state transition actually occurred
1015 /// (Provisioning → Active) and `Ok(false)` if the shard was
1016 /// already `Active` — the latter is the idempotent path.
1017 /// Returns `InvalidPolicy` for unknown or `Draining`/`Stopped`
1018 /// shards — those states require a different lifecycle path.
1019 /// Bumps `active_count` and notifies the `on_shard_created`
1020 /// callback exactly once per real transition.
1021 ///
1022 /// Pre-fix this returned `Result<(), ScalingError>`,
1023 /// so callers (notably `ShardManager::activate_shard`) could
1024 /// not tell whether they had bumped the live count or not and
1025 /// double-incremented their own `num_shards` on every
1026 /// idempotent call.
1027 pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError> {
1028 let mut shards = self.shards.write();
1029 let shard = shards
1030 .iter_mut()
1031 .find(|s| s.id == shard_id)
1032 .ok_or_else(|| {
1033 ScalingError::InvalidPolicy(format!("activate: shard {} not found", shard_id))
1034 })?;
1035 match shard.state {
1036 ShardState::Active => return Ok(false),
1037 ShardState::Provisioning => {
1038 // Gate activation on
1039 // `active_count < max_shards`. The budget gate at
1040 // `check_scale_up_budget` only counts ALREADY-active
1041 // shards — multiple `scale_up_provisioning(1)` calls
1042 // can each pass (they don't bump `active_count`),
1043 // and an unconditional `fetch_add(1)` here would
1044 // push past `max_shards`. Subsequent
1045 // `evaluate_scaling`'s `max_shards - active_count`
1046 // arithmetic would then underflow u16 (debug-build
1047 // panic; release wraps to ~65530). Activate-time
1048 // re-checks the budget with the lock held; the
1049 // Provisioning shard stays in Provisioning state and
1050 // the caller (e.g. `add_shard_internal`'s rollback)
1051 // is responsible for tearing it down.
1052 //
1053 // The load + state mutation + `fetch_add` all happen
1054 // while we hold the `shards.write()` guard so a
1055 // concurrent `activate(distinct_id)` reads the
1056 // already-bumped `active_count` and hits
1057 // `AtMaxShards` instead of squeezing through the
1058 // window between our state update and our
1059 // `fetch_add`. Pre-fix the `fetch_add` ran after
1060 // `drop(shards)` — two activates could each see a
1061 // stale count below `max_shards` and both bump,
1062 // transiently overshooting the budget.
1063 let current = self.active_count.load(AtomicOrdering::Acquire);
1064 if current >= self.policy.max_shards {
1065 return Err(ScalingError::AtMaxShards);
1066 }
1067 shard.state = ShardState::Active;
1068 // Re-stamp `activated_at` so `evaluate_scaling`
1069 // gives this freshly-activated shard a warmup
1070 // window before counting it in the
1071 // overloaded/underutilized tallies. The
1072 // Provisioning → Active transition is the moment
1073 // traffic starts flowing; the shard's
1074 // `last_metrics` is still the
1075 // `ShardMetrics::new(id)` placeholder until the
1076 // next `collect_metrics` cycle.
1077 shard.activated_at = Instant::now();
1078 // Publish the increment while still holding the
1079 // write lock. `Release` here pairs with the
1080 // `Acquire` load above so any later activator that
1081 // takes the same lock observes our bump.
1082 self.active_count.fetch_add(1, AtomicOrdering::Release);
1083 }
1084 ShardState::Draining | ShardState::Stopped => {
1085 return Err(ScalingError::InvalidPolicy(format!(
1086 "activate: shard {} is in state {:?}, cannot activate",
1087 shard_id, shard.state
1088 )));
1089 }
1090 }
1091 // The shard just became routable. Refresh the selection
1092 // table before dropping the lock so the next `select_shard`
1093 // observes it.
1094 self.rebuild_selection_table_locked(&shards);
1095 drop(shards);
1096
1097 if let Some(callback) = self.on_shard_created.read().as_ref() {
1098 callback(shard_id);
1099 }
1100 Ok(true)
1101 }
1102
1103 /// Drain a specific shard by id, transitioning it from `Active`
1104 /// to `Draining`.
1105 ///
1106 /// Companion to `ShardManager::drain_shard`. The previous
1107 /// implementation only flipped the metrics collector's `draining`
1108 /// atomic; this version atomically updates `MappedShard.state`
1109 /// (so `select_shard` stops routing to the shard) and decrements
1110 /// `active_count` (so `evaluate_scaling`'s budget math stays
1111 /// consistent with `scale_down`). Returns an error if the shard
1112 /// is not in `Active` state, or if doing so would push the active
1113 /// count below `min_shards`.
1114 pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError> {
1115 let mut shards = self.shards.write();
1116 let current_active = self.active_count.load(AtomicOrdering::Acquire);
1117 if current_active <= self.policy.min_shards {
1118 return Err(ScalingError::AtMinShards);
1119 }
1120 let shard = shards
1121 .iter_mut()
1122 .find(|s| s.id == shard_id)
1123 .ok_or_else(|| {
1124 ScalingError::InvalidPolicy(format!("drain_specific: shard {} not found", shard_id))
1125 })?;
1126 match shard.state {
1127 ShardState::Active => {
1128 shard.state = ShardState::Draining;
1129 shard.drain_started = Some(Instant::now());
1130 shard.metrics.set_draining(true);
1131 }
1132 ShardState::Draining => return Ok(()),
1133 ShardState::Provisioning | ShardState::Stopped => {
1134 return Err(ScalingError::InvalidPolicy(format!(
1135 "drain_specific: shard {} is in state {:?}, cannot drain",
1136 shard_id, shard.state
1137 )));
1138 }
1139 }
1140 // The drained shard is no longer routable. Refresh the
1141 // selection table before dropping the lock.
1142 self.rebuild_selection_table_locked(&shards);
1143 drop(shards);
1144 self.active_count.fetch_sub(1, AtomicOrdering::Release);
1145 // Bump `last_scaling` so a subsequent `scale_up` is gated
1146 // by the cooldown floor. Pre-fix `drain_specific` removed
1147 // a shard from Active without touching `last_scaling`, so
1148 // the sequence `drain_specific(id) → scale_up(N)`
1149 // bypassed the cooldown — `scale_down` writes
1150 // `last_scaling` precisely for this reason. From the
1151 // budget-math perspective `drain_specific` IS a scale-
1152 // down (it decrements `active_count` and trips the
1153 // `min_shards` floor), so it should also gate
1154 // re-expansion the same way.
1155 *self.last_scaling.write() = Some(Instant::now());
1156 Ok(())
1157 }
1158
1159 /// Start draining shards for scale-down.
1160 ///
1161 /// Marks shards as draining so they stop receiving new events.
1162 /// Shards will be removed once they're empty.
1163 pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1164 // Early checks (may race, but avoid acquiring the write lock)
1165 let current = self.active_count.load(AtomicOrdering::Acquire);
1166 if current <= self.policy.min_shards {
1167 return Err(ScalingError::AtMinShards);
1168 }
1169
1170 let to_drain = count.min(current - self.policy.min_shards);
1171 if to_drain == 0 {
1172 return Err(ScalingError::AtMinShards);
1173 }
1174 {
1175 let last = self.last_scaling.read();
1176 if let Some(ts) = *last {
1177 if ts.elapsed() < self.policy.cooldown {
1178 return Err(ScalingError::InCooldown);
1179 }
1180 }
1181 }
1182
1183 let mut shards = self.shards.write();
1184
1185 // Re-check under the lock to prevent race conditions (double-check pattern)
1186 let current = self.active_count.load(AtomicOrdering::Acquire);
1187 if current <= self.policy.min_shards {
1188 return Err(ScalingError::AtMinShards);
1189 }
1190 let to_drain = count.min(current - self.policy.min_shards);
1191 if to_drain == 0 {
1192 return Err(ScalingError::AtMinShards);
1193 }
1194 // Re-check cooldown under the same write lock that gates
1195 // mutation — see the matching note in `scale_up`.
1196 {
1197 let last = self.last_scaling.read();
1198 if let Some(ts) = *last {
1199 if ts.elapsed() < self.policy.cooldown {
1200 return Err(ScalingError::InCooldown);
1201 }
1202 }
1203 }
1204
1205 let mut drained_ids = Vec::with_capacity(to_drain as usize);
1206
1207 // Find shards with lowest weight (least utilized) to drain
1208 let mut active_indices: Vec<_> = shards
1209 .iter()
1210 .enumerate()
1211 .filter(|(_, s)| s.state == ShardState::Active)
1212 .map(|(i, s)| (i, s.last_metrics.weight))
1213 .collect();
1214
1215 // Sort by weight (ascending - drain least utilized first)
1216 active_indices.sort_by(|a, b| a.1.total_cmp(&b.1));
1217
1218 // Mark shards for draining
1219 for (idx, _) in active_indices.into_iter().take(to_drain as usize) {
1220 shards[idx].state = ShardState::Draining;
1221 shards[idx].drain_started = Some(Instant::now());
1222 shards[idx].metrics.set_draining(true);
1223 drained_ids.push(shards[idx].id);
1224 }
1225
1226 // Update count
1227 self.active_count
1228 .fetch_sub(to_drain, AtomicOrdering::Release);
1229 *self.last_scaling.write() = Some(Instant::now());
1230
1231 // Drained shards are no longer routable. Refresh the
1232 // selection table while still holding the lock.
1233 self.rebuild_selection_table_locked(&shards);
1234
1235 Ok(drained_ids)
1236 }
1237
1238 /// Check draining shards and finalize those that are empty.
1239 ///
1240 /// Returns IDs of shards that were stopped.
1241 ///
1242 /// This predicate looks ONLY at the ring buffer
1243 /// (`current_len` + `pushes_since_drain_start`); it does NOT
1244 /// probe the per-shard mpsc channel or the BatchWorker's
1245 /// `current_batch`. A shard that the predicate flags as empty
1246 /// can still have events queued in those two places. The
1247 /// correctness gate is therefore `bus::remove_shard_internal`,
1248 /// which awaits the BatchWorker's `JoinHandle` before
1249 /// constructing the stranded-flush batch — see that function's
1250 /// step 3 for the rationale. Tightening this predicate is a
1251 /// defense-in-depth follow-up; a stricter ring-buffer-empty
1252 /// signal here would only narrow an already-closed window.
1253 pub fn finalize_draining(&self) -> Vec<u16> {
1254 let mut shards = self.shards.write();
1255 let mut stopped = Vec::new();
1256
1257 for shard in shards.iter_mut() {
1258 if shard.state == ShardState::Draining {
1259 // Check if shard is empty by reading current_len directly,
1260 // avoiding collect_and_reset() which destructively zeros all counters.
1261 let current_len = shard.metrics.current_len.load(AtomicOrdering::Relaxed);
1262 let fill_ratio = if shard.metrics.capacity > 0 {
1263 current_len as f64 / shard.metrics.capacity as f64
1264 } else {
1265 0.0
1266 };
1267 // Previously read `events_in_window` here, which
1268 // `collect_and_reset` zeros every metrics tick. A
1269 // producer push that landed in the window between two
1270 // ticks could be silently zeroed out, so a draining
1271 // shard whose buffer transiently emptied was finalized
1272 // with a producer still attached.
1273 // `pushes_since_drain_start` is a separate counter
1274 // that is only reset by `set_draining(true)`, so any
1275 // push observed since the drain began is sticky —
1276 // exactly the signal we want.
1277 // Acquire pairs with `set_draining`'s SeqCst reset so
1278 // the load can't observe a stale value from before the
1279 // drain began. A Relaxed load here let weakly-ordered
1280 // hardware see the pre-reset count and finalize while
1281 // a producer was still pushing.
1282 let pushes_after_drain = shard.metrics.pushes_since_drain_start();
1283 if fill_ratio == 0.0 && pushes_after_drain == 0 {
1284 // Check if we've waited long enough
1285 if let Some(drain_start) = shard.drain_started {
1286 if drain_start.elapsed() > Duration::from_millis(100) {
1287 shard.state = ShardState::Stopped;
1288 stopped.push(shard.id);
1289 }
1290 }
1291 }
1292 }
1293 }
1294
1295 // Draining → Stopped transitions: no impact on the routable
1296 // set (Draining was already excluded), but rebuild for
1297 // consistency in case `last_metrics.weight` was stale on a
1298 // shard that just stopped. Cheap; bounded by shard count.
1299 if !stopped.is_empty() {
1300 self.rebuild_selection_table_locked(&shards);
1301 }
1302
1303 // Drop the write lock BEFORE notifying. The callback is
1304 // user-supplied and may re-enter the mapper (`shard_state`,
1305 // `select_shard`, `metrics_collector`, …), each of which
1306 // acquires `shards.read()`. `parking_lot::RwLock` is not
1307 // recursive, so a re-entrant read attempt while we hold a
1308 // write would deadlock. `scale_up`'s callback path already
1309 // releases its lock before calling out — mirror that
1310 // here.
1311 drop(shards);
1312
1313 if !stopped.is_empty() {
1314 if let Some(callback) = self.on_shard_removed.read().as_ref() {
1315 for &id in &stopped {
1316 callback(id);
1317 }
1318 }
1319 }
1320
1321 stopped
1322 }
1323
1324 /// Remove a specific shard from the mapper if it is in the
1325 /// `Stopped` state. Used by `ShardManager::remove_shard` so a
1326 /// per-shard cleanup doesn't disturb sibling `Stopped`
1327 /// entries — which a sequential `manual_scale_down` loop
1328 /// still needs to look up state for. Returns `true` if the
1329 /// shard existed and was Stopped (and was removed).
1330 pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool {
1331 let mut shards = self.shards.write();
1332 let before = shards.len();
1333 shards.retain(|s| !(s.id == shard_id && s.state == ShardState::Stopped));
1334 let removed = shards.len() < before;
1335 if removed {
1336 // Stopped shards weren't routable, but the underlying
1337 // slice changed length / order — rebuild so the table
1338 // doesn't dangle a stale id.
1339 self.rebuild_selection_table_locked(&shards);
1340 }
1341 removed
1342 }
1343
1344 /// Remove stopped shards from the mapper.
1345 pub fn remove_stopped_shards(&self) -> Vec<u16> {
1346 let mut shards = self.shards.write();
1347 let before = shards.len();
1348 let removed: Vec<u16> = shards
1349 .iter()
1350 .filter(|s| s.state == ShardState::Stopped)
1351 .map(|s| s.id)
1352 .collect();
1353
1354 shards.retain(|s| s.state != ShardState::Stopped);
1355
1356 if shards.len() < before {
1357 tracing::info!(
1358 removed = removed.len(),
1359 remaining = shards.len(),
1360 "Removed stopped shards"
1361 );
1362 // Same reason as `remove_specific_stopped_shard`.
1363 self.rebuild_selection_table_locked(&shards);
1364 }
1365
1366 removed
1367 }
1368
1369 /// Get the state of a specific shard.
1370 pub fn shard_state(&self, shard_id: u16) -> Option<ShardState> {
1371 self.shards
1372 .read()
1373 .iter()
1374 .find(|s| s.id == shard_id)
1375 .map(|s| s.state)
1376 }
1377
1378 /// Get all active shard IDs.
1379 pub fn active_shard_ids(&self) -> Vec<u16> {
1380 self.shards
1381 .read()
1382 .iter()
1383 .filter(|s| s.state == ShardState::Active)
1384 .map(|s| s.id)
1385 .collect()
1386 }
1387
1388 /// Get all shard IDs (including draining).
1389 pub fn all_shard_ids(&self) -> Vec<u16> {
1390 self.shards
1391 .read()
1392 .iter()
1393 .filter(|s| s.state != ShardState::Stopped)
1394 .map(|s| s.id)
1395 .collect()
1396 }
1397
1398 /// Get the scaling policy.
1399 pub fn policy(&self) -> &ScalingPolicy {
1400 &self.policy
1401 }
1402 // `set_policy` previously took `&mut self` and was unreachable
1403 // through the `Arc<ShardMapper>` that the production code holds
1404 // (`Arc::get_mut` fails once the worker pool has cloned the Arc).
1405 // The method has been removed — recreate the mapper / bus to
1406 // change the policy.
1407}
1408
1409#[cfg(test)]
1410mod tests {
1411 #![allow(
1412 clippy::disallowed_methods,
1413 reason = "test code legitimately uses std::sync::{Mutex,RwLock} for SUT setup; tests have no real poison concern"
1414 )]
1415 use super::*;
1416
1417 #[test]
1418 fn test_shard_mapper_creation() {
1419 let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1420 assert_eq!(mapper.active_shard_count(), 4);
1421 assert_eq!(mapper.total_shard_count(), 4);
1422 }
1423
1424 #[test]
1425 fn test_select_shard_distributes() {
1426 let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1427
1428 // Different hashes should potentially select different shards
1429 let mut selected = std::collections::HashSet::new();
1430 for i in 0..100u64 {
1431 let shard = mapper.select_shard(i * 12345);
1432 selected.insert(shard);
1433 }
1434
1435 // With 4 shards, we should hit multiple
1436 assert!(!selected.is_empty());
1437 }
1438
1439 /// Pin for perf #2: the selection table reflects the routable
1440 /// subset *exactly*, and `select_shard` reads it lock-free.
1441 /// A regression that, say, forgot to refresh after `drain_specific`
1442 /// would let `select_shard` continue routing to the draining
1443 /// shard — observable here as the drained id appearing in
1444 /// `select_shard`'s output.
1445 #[test]
1446 fn selection_table_reflects_active_subset_after_state_transitions() {
1447 // ScalingPolicy::validate rejects `min_shards == 0`; use 1
1448 // and verify we can drain down to the floor via the
1449 // official `drain_specific` API.
1450 let policy = ScalingPolicy {
1451 min_shards: 1,
1452 max_shards: 8,
1453 cooldown: Duration::from_nanos(1),
1454 ..Default::default()
1455 };
1456 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1457
1458 // Initial: all 3 shards routable.
1459 let mut seen = std::collections::HashSet::new();
1460 for i in 0..500u64 {
1461 seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1462 }
1463 assert_eq!(seen, std::collections::HashSet::from([0, 1, 2]));
1464
1465 // Drain shard 1 → only 0 and 2 should be routable.
1466 mapper.drain_specific(1).unwrap();
1467 let mut seen = std::collections::HashSet::new();
1468 for i in 0..500u64 {
1469 seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1470 }
1471 assert_eq!(
1472 seen,
1473 std::collections::HashSet::from([0, 2]),
1474 "drained shard must vanish from select_shard output; \
1475 a regression here would let select_shard route to a \
1476 Draining shard (blocking finalization)",
1477 );
1478
1479 // Drain shard 2 → only 0 left (at min_shards floor).
1480 mapper.drain_specific(2).unwrap();
1481 let mut seen = std::collections::HashSet::new();
1482 for i in 0..500u64 {
1483 seen.insert(mapper.select_shard(i.wrapping_mul(0x9E3779B97F4A7C15)));
1484 }
1485 assert_eq!(seen, std::collections::HashSet::from([0]));
1486 }
1487
1488 /// Pin: `select_shard` does NOT acquire `self.shards` (read or
1489 /// write). The whole perf #2 fix is that the hot path is
1490 /// `ArcSwap::load` only — no parking_lot. We pin this by
1491 /// holding a write lock on `shards` and asserting
1492 /// `select_shard` still returns immediately.
1493 #[test]
1494 fn select_shard_does_not_acquire_shards_lock() {
1495 let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1496
1497 // Hold the write lock. If `select_shard` tried to acquire
1498 // any flavor of `shards.{read,write}()` it would block
1499 // forever — parking_lot is not recursive.
1500 let _guard = mapper.shards.write();
1501 let shard = mapper.select_shard(0xDEAD_BEEF);
1502 assert!(shard < 2, "select_shard must return one of [0, 1]");
1503 }
1504
1505 /// `select_shard`'s candidate-index computation must
1506 /// be unbiased across the u64 hash space. Pre-fix used
1507 /// `hash as usize % candidates.len()`, which over-weights low
1508 /// indices when `candidates.len()` is not a power of two.
1509 /// With u64 hashes the bias is small but non-zero and
1510 /// sustains a hot-shard skew over time. Lemire's
1511 /// `(hash * len) >> 64` is unbiased.
1512 ///
1513 /// We test the unbiased property by sampling a uniform
1514 /// distribution of u64 hashes over 3 candidate shards and
1515 /// asserting each bucket gets close to 1/3 of the picks.
1516 /// Empirical bound: ±5% across 30 000 trials with a
1517 /// well-distributed input.
1518 #[test]
1519 fn select_shard_distribution_is_unbiased() {
1520 // 3 candidates: a non-power-of-2 to expose the modulo
1521 // bias the fix removes.
1522 let mapper = ShardMapper::new(3, 1024, ScalingPolicy::default()).unwrap();
1523
1524 let trials = 30_000u64;
1525 // Spread inputs uniformly across the u64 range so the
1526 // multiply-shift mapping behaves as designed.
1527 let stride = u64::MAX / trials;
1528 let mut counts = [0u64; 3];
1529 for i in 0..trials {
1530 let h = i.wrapping_mul(stride);
1531 let id = mapper.select_shard(h);
1532 counts[id as usize] += 1;
1533 }
1534
1535 let expected = (trials / 3) as i64;
1536 for (id, &count) in counts.iter().enumerate() {
1537 let diff = (count as i64 - expected).abs();
1538 let pct = (diff as f64 / expected as f64) * 100.0;
1539 assert!(
1540 pct < 5.0,
1541 "shard {} bucket has {} hits ({:.2}% off expected {}); \
1542 modulo bias would drift higher on certain shards",
1543 id,
1544 count,
1545 pct,
1546 expected
1547 );
1548 }
1549 }
1550
1551 #[test]
1552 fn test_scale_up() {
1553 // Explicitly set max_shards to allow scaling from 2 to 4
1554 let policy = ScalingPolicy {
1555 max_shards: 8,
1556 ..Default::default()
1557 };
1558 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1559
1560 let new_ids = mapper.scale_up(2).unwrap();
1561 assert_eq!(new_ids.len(), 2);
1562 assert_eq!(mapper.active_shard_count(), 4);
1563 }
1564
1565 #[test]
1566 fn test_scale_up_max_limit() {
1567 let policy = ScalingPolicy {
1568 max_shards: 4,
1569 ..Default::default()
1570 };
1571 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1572
1573 let result = mapper.scale_up(1);
1574 assert!(matches!(result, Err(ScalingError::AtMaxShards)));
1575 }
1576
1577 #[test]
1578 fn test_scale_down() {
1579 let policy = ScalingPolicy {
1580 min_shards: 1,
1581 cooldown: Duration::from_nanos(1), // Disable cooldown for test
1582 ..Default::default()
1583 };
1584 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1585
1586 let drained = mapper.scale_down(2).unwrap();
1587 assert_eq!(drained.len(), 2);
1588 assert_eq!(mapper.active_shard_count(), 2);
1589 }
1590
1591 #[test]
1592 fn test_scale_down_min_limit() {
1593 let policy = ScalingPolicy {
1594 min_shards: 4,
1595 ..Default::default()
1596 };
1597 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1598
1599 let result = mapper.scale_down(1);
1600 assert!(matches!(result, Err(ScalingError::AtMinShards)));
1601 }
1602
1603 #[test]
1604 fn test_metrics_collection() {
1605 let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1606
1607 // Record some metrics
1608 if let Some(collector) = mapper.metrics_collector(0) {
1609 collector.record_buffer_len(512);
1610 collector.record_push(5);
1611 collector.record_push(10);
1612 }
1613
1614 let metrics = mapper.collect_metrics();
1615 assert_eq!(metrics.len(), 2);
1616
1617 let shard0_metrics = metrics.iter().find(|m| m.shard_id == 0).unwrap();
1618 assert!(shard0_metrics.fill_ratio > 0.0);
1619 }
1620
1621 /// Regression: a `record_push` / `record_flush` interleaving
1622 /// with a `collect_and_reset` swap must NOT desync `(sum,
1623 /// count)`. Pre-fix `push_latency_sum_ns` and `push_count`
1624 /// were independent atomics; a tick between the two
1625 /// `fetch_add`s captured the sum without the matching count
1626 /// (or the count without the sum). The resulting `avg =
1627 /// sum.checked_div(count).unwrap_or(0)` returned 0 in window
1628 /// N (sum without count) AND 0 in window N+1 (count without
1629 /// sum) — silently zeroing the average that drives
1630 /// `evaluate_scaling`'s push-latency scale-up trigger.
1631 ///
1632 /// Post-fix `(sum, count)` is packed into one
1633 /// `AtomicU64` so the swap captures both atomically. This
1634 /// test fires N concurrent `record_push` calls and a single
1635 /// `collect_and_reset` and asserts the captured count
1636 /// matches the captured sum (i.e. `sum >= count` because
1637 /// every push contributes at least 1 ns; `sum / count` is
1638 /// well-defined for any non-zero count).
1639 #[test]
1640 fn record_push_collect_no_sum_count_desync() {
1641 use std::sync::Barrier;
1642 use std::thread;
1643
1644 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1645 const PUSHERS: usize = 4;
1646 const PUSHES_PER_THREAD: usize = 1_000;
1647
1648 let barrier = Arc::new(Barrier::new(PUSHERS + 1));
1649 let mut handles = vec![];
1650 for _ in 0..PUSHERS {
1651 let c = collector.clone();
1652 let b = barrier.clone();
1653 handles.push(thread::spawn(move || {
1654 b.wait();
1655 for i in 0..PUSHES_PER_THREAD {
1656 // Vary latencies so sum/count averages
1657 // aren't trivially the same number.
1658 c.record_push((i as u64 % 100) + 1);
1659 }
1660 }));
1661 }
1662
1663 // Race a collect_and_reset across the pushers.
1664 barrier.wait();
1665 let snapshot1 = collector.collect_and_reset();
1666 for h in handles {
1667 h.join().unwrap();
1668 }
1669 // After all threads finish, drain whatever remains.
1670 let snapshot2 = collector.collect_and_reset();
1671
1672 // Reconstruct count and sum from the two snapshots.
1673 // We can't easily expose the packed atomic, so we use
1674 // the per-window averages and event counts as a
1675 // consistency check.
1676 let total_events = snapshot1.event_rate + snapshot2.event_rate;
1677 assert_eq!(
1678 total_events as usize,
1679 PUSHERS * PUSHES_PER_THREAD,
1680 "all pushes must be accounted for"
1681 );
1682
1683 // For each window: if event_rate > 0, avg_push_latency
1684 // must be > 0 (non-zero average) — pre-fix the desync
1685 // could land event_rate > 0 with avg = 0 (count
1686 // captured without sum, or sum without count → div-by-
1687 // zero clamped to 0). This is the directly visible
1688 // symptom of the desync.
1689 //
1690 // events_in_window is incremented separately from the
1691 // packed (sum, count) word, so a strict assertion of
1692 // "event_rate is exactly count" isn't safe — but the
1693 // weaker invariant "if any pushes were captured in the
1694 // (sum,count) word, the average is non-zero" survives
1695 // the packed-atomic fix.
1696 for snap in [&snapshot1, &snapshot2] {
1697 if snap.avg_push_latency_ns == 0 {
1698 // Either no pushes were captured in this
1699 // window, or — pre-fix — sum/count desynced.
1700 // The post-fix shape can only produce
1701 // avg=0 when count is also 0; we can't read
1702 // count directly from ShardMetrics, but
1703 // exercise the invariant by confirming the
1704 // OTHER window's sum is consistent with all
1705 // pushes.
1706 continue;
1707 }
1708 assert!(
1709 snap.avg_push_latency_ns >= 1,
1710 "regression: a window with non-zero avg must have \
1711 a positive sum (pre-fix sum-without-count desync \
1712 produced avg=0 with non-zero events)"
1713 );
1714 }
1715 }
1716
1717 #[test]
1718 fn test_draining_excludes_from_selection() {
1719 let policy = ScalingPolicy {
1720 min_shards: 1,
1721 cooldown: Duration::from_nanos(1),
1722 ..Default::default()
1723 };
1724 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1725
1726 // Drain one shard
1727 let drained = mapper.scale_down(1).unwrap();
1728 assert_eq!(drained.len(), 1);
1729
1730 // All selections should go to the remaining active shard
1731 let active_ids = mapper.active_shard_ids();
1732 assert_eq!(active_ids.len(), 1);
1733
1734 for i in 0..100u64 {
1735 let selected = mapper.select_shard(i);
1736 assert!(active_ids.contains(&selected));
1737 }
1738 }
1739
1740 #[test]
1741 fn test_policy_validation() {
1742 let invalid_policy = ScalingPolicy {
1743 fill_ratio_threshold: 1.5, // Invalid
1744 ..Default::default()
1745 };
1746 assert!(invalid_policy.validate().is_err());
1747
1748 // Without normalize(), this would be invalid
1749 let invalid_policy2 = ScalingPolicy {
1750 min_shards: 10,
1751 max_shards: 5,
1752 ..Default::default()
1753 };
1754 assert!(invalid_policy2.validate().is_err());
1755 }
1756
1757 #[test]
1758 fn test_policy_normalize_auto_adjusts_max_shards() {
1759 // When min_shards > max_shards, normalize() should adjust max_shards
1760 let policy = ScalingPolicy {
1761 min_shards: 8,
1762 max_shards: 2, // Less than min_shards
1763 ..Default::default()
1764 };
1765
1766 let normalized = policy.normalize();
1767 assert_eq!(
1768 normalized.max_shards, 8,
1769 "max_shards should be adjusted to min_shards"
1770 );
1771 assert!(
1772 normalized.validate().is_ok(),
1773 "normalized policy should be valid"
1774 );
1775 }
1776
1777 /// Regression: BUG_REPORT.md #7 — `scale_up` previously allocated
1778 /// new shard ids as `shards.iter().max() + 1`, which reused ids
1779 /// after the highest-numbered shard was drained-and-removed.
1780 /// Reusing ids merges two distinct shard lifetimes in any
1781 /// external metric/checkpoint system that keys on shard id.
1782 /// The fix uses a monotonic `next_shard_id` counter.
1783 #[test]
1784 fn scale_up_does_not_reuse_ids_after_remove() {
1785 let policy = ScalingPolicy {
1786 min_shards: 1,
1787 max_shards: 16,
1788 cooldown: Duration::from_nanos(1),
1789 ..Default::default()
1790 };
1791 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1792
1793 // Initial ids are 0 and 1. Scale up to 4 — new ids must be
1794 // 2 and 3 (the next two slots from the monotonic counter).
1795 let new_ids = mapper.scale_up(2).unwrap();
1796 assert_eq!(new_ids, vec![2, 3]);
1797
1798 // Drain one shard. `scale_down` picks by lowest weight, so
1799 // we don't get to choose which id is drained. Whichever it
1800 // is, we then force it through Stopped + remove and check
1801 // that the removed id is *not* reissued by the next
1802 // scale_up.
1803 let drained = mapper.scale_down(1).unwrap();
1804 let drained_id = drained[0];
1805 for shard in mapper.shards.write().iter_mut() {
1806 if shard.id == drained_id {
1807 shard.state = ShardState::Stopped;
1808 }
1809 }
1810 let removed = mapper.remove_stopped_shards();
1811 assert_eq!(removed, vec![drained_id]);
1812
1813 // Scale up by 1. The broken `max(existing_ids) + 1` allocator
1814 // could revive the just-removed id whenever `drained_id` had
1815 // been the highest id present (e.g. id 3 with 0/1/2/3 → 0/1/2
1816 // → next would be 3 again). The fix uses the monotonic
1817 // counter, which sits at 4 after the earlier scale_up, so
1818 // the new id must be 4 regardless of which id was drained.
1819 let new_ids = mapper.scale_up(1).unwrap();
1820 assert_eq!(
1821 new_ids,
1822 vec![4],
1823 "shard id {drained_id} was just removed; reusing any \
1824 previously-issued id would merge two distinct shard \
1825 lifetimes in external systems"
1826 );
1827 }
1828
1829 #[test]
1830 fn test_policy_normalize_preserves_valid_config() {
1831 // When max_shards >= min_shards, normalize() should not change anything
1832 let policy = ScalingPolicy {
1833 min_shards: 4,
1834 max_shards: 16,
1835 ..Default::default()
1836 };
1837
1838 let normalized = policy.normalize();
1839 assert_eq!(normalized.min_shards, 4);
1840 assert_eq!(normalized.max_shards, 16);
1841 }
1842
1843 #[test]
1844 fn test_shard_mapper_normalizes_policy() {
1845 // ShardMapper should accept a policy where min_shards > default max_shards
1846 // because it calls normalize() internally
1847 let policy = ScalingPolicy {
1848 min_shards: 4,
1849 ..Default::default()
1850 };
1851
1852 // This should succeed even on machines with < 4 CPUs
1853 let result = ShardMapper::new(4, 1024, policy);
1854 assert!(
1855 result.is_ok(),
1856 "ShardMapper should normalize policy automatically"
1857 );
1858 }
1859
1860 #[test]
1861 fn test_shard_mapper_adjusts_max_shards_to_initial_count() {
1862 // ShardMapper should adjust max_shards to accommodate initial_shards
1863 // even if initial_shards > default max_shards (CPU count)
1864 let policy = ScalingPolicy::default();
1865
1866 // Create mapper with 8 initial shards - should work even on 2-core machines
1867 let result = ShardMapper::new(8, 1024, policy);
1868 assert!(
1869 result.is_ok(),
1870 "ShardMapper should adjust max_shards to initial_shards"
1871 );
1872
1873 let mapper = result.unwrap();
1874 assert_eq!(mapper.active_shard_count(), 8);
1875
1876 // Verify the policy was adjusted
1877 assert!(
1878 mapper.policy().max_shards >= 8,
1879 "max_shards should be at least initial_shards"
1880 );
1881 }
1882
1883 #[test]
1884 fn test_scale_up_max_shards_concurrent() {
1885 use std::sync::Arc;
1886 use std::thread;
1887
1888 let policy = ScalingPolicy {
1889 max_shards: 10,
1890 cooldown: Duration::from_nanos(1), // Disable cooldown for test
1891 ..Default::default()
1892 };
1893 let mapper = Arc::new(ShardMapper::new(5, 1024, policy).unwrap());
1894
1895 // Spawn multiple threads that all try to scale up
1896 let mut handles = vec![];
1897 for _ in 0..5 {
1898 let mapper_clone = mapper.clone();
1899 handles.push(thread::spawn(move || mapper_clone.scale_up(3)));
1900 }
1901
1902 // Collect results
1903 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1904
1905 // Some should succeed, some should fail with AtMaxShards
1906 let successes: Vec<_> = results.iter().filter(|r| r.is_ok()).collect();
1907 let failures: Vec<_> = results
1908 .iter()
1909 .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1910 .collect();
1911
1912 // We started with 5, max is 10, each tries to add 3
1913 // At most we can add 5 more shards, so at most 1-2 can succeed
1914 assert!(
1915 !successes.is_empty() || !failures.is_empty(),
1916 "at least some operations should complete"
1917 );
1918
1919 // Final count should not exceed max_shards
1920 assert!(
1921 mapper.active_shard_count() <= 10,
1922 "should never exceed max_shards, got {}",
1923 mapper.active_shard_count()
1924 );
1925 }
1926
1927 /// Multiple `scale_up_provisioning(1)` calls must
1928 /// never push `active_count` past `max_shards` via subsequent
1929 /// `activate()` calls. Pre-fix the budget gate
1930 /// (`check_scale_up_budget`) only counted ALREADY-active
1931 /// shards, so several `scale_up_provisioning` calls could each
1932 /// pass and then each `activate()` unconditionally bumped
1933 /// `active_count` past the cap.
1934 ///
1935 /// Setup: at the budget edge, allocate two Provisioning shards
1936 /// in sequence (both pass the gate because `active_count`
1937 /// hasn't moved). The first `activate()` fills the budget;
1938 /// the second must surface `AtMaxShards` rather than overflow.
1939 #[test]
1940 fn activate_rejects_when_active_count_would_exceed_max_shards() {
1941 let policy = ScalingPolicy {
1942 min_shards: 1,
1943 max_shards: 4,
1944 cooldown: Duration::from_nanos(1),
1945 ..Default::default()
1946 };
1947 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1948 // active_count = 3, max = 4. Allocate TWO Provisioning
1949 // shards; both pass the budget gate (it sees active_count=3).
1950 let ids_a = mapper.scale_up_provisioning(1).unwrap();
1951 // The 1ns cooldown elapses on every realistic scheduler
1952 // tick; if we lose that race, retry. Release-mode and
1953 // some debug-build paths occasionally finish the first
1954 // allocation in <1ns of wall time.
1955 let ids_b = loop {
1956 match mapper.scale_up_provisioning(1) {
1957 Ok(v) => break v,
1958 Err(ScalingError::InCooldown) => {
1959 std::thread::sleep(Duration::from_nanos(50));
1960 }
1961 Err(e) => panic!("unexpected error from scale_up_provisioning: {:?}", e),
1962 }
1963 };
1964 assert_eq!(ids_a.len(), 1);
1965 assert_eq!(ids_b.len(), 1);
1966
1967 // First activate succeeds (active_count goes 3 → 4 = max).
1968 mapper
1969 .activate(ids_a[0])
1970 .expect("first activate must succeed");
1971 assert_eq!(mapper.active_shard_count(), 4);
1972
1973 // Second activate must REFUSE — it would push past max.
1974 let err = mapper
1975 .activate(ids_b[0])
1976 .expect_err("second activate must reject");
1977 assert!(
1978 matches!(err, ScalingError::AtMaxShards),
1979 "expected AtMaxShards, got {:?}",
1980 err
1981 );
1982 // active_count must still be at the cap, not over it.
1983 assert_eq!(mapper.active_shard_count(), 4);
1984 }
1985
1986 /// Pin: under contention, the active_count never transiently
1987 /// exceeds `max_shards`. Pre-fix `activate` released the
1988 /// shards write-lock BEFORE the `fetch_add(1)`, so two
1989 /// concurrent activators could each pass the budget gate
1990 /// (both reading the pre-bump count) and both bump,
1991 /// overshooting the cap by 1 at any observation point.
1992 #[test]
1993 fn concurrent_activate_never_exceeds_max_shards() {
1994 use std::sync::Arc;
1995 use std::sync::Barrier;
1996 use std::thread;
1997
1998 const ITERATIONS: usize = 200;
1999
2000 for iter in 0..ITERATIONS {
2001 let policy = ScalingPolicy {
2002 min_shards: 1,
2003 max_shards: 4,
2004 cooldown: Duration::from_nanos(1),
2005 ..Default::default()
2006 };
2007 // Start at active=3, allocate two Provisioning ids so
2008 // both threads have a candidate to activate. With the
2009 // pre-fix race: thread A loads count=3, validates,
2010 // sets state Active, drops lock; thread B loads count
2011 // (still 3), validates, sets state Active, drops lock;
2012 // A bumps → 4; B bumps → 5. Post-fix B observes
2013 // count=4 inside its lock and rejects with
2014 // AtMaxShards.
2015 let mapper = Arc::new(ShardMapper::new(3, 1024, policy).unwrap());
2016 let ids_a = mapper.scale_up_provisioning(1).unwrap();
2017 // The 1ns cooldown elapses on every realistic
2018 // scheduler tick; if we lose that race, retry once
2019 // (release-mode iterations occasionally finish the
2020 // first allocation in <1ns of wall time).
2021 let ids_b = loop {
2022 match mapper.scale_up_provisioning(1) {
2023 Ok(v) => break v,
2024 Err(ScalingError::InCooldown) => {
2025 std::thread::sleep(Duration::from_micros(10));
2026 continue;
2027 }
2028 Err(e) => panic!("unexpected error: {:?}", e),
2029 }
2030 };
2031 assert_eq!(ids_a.len(), 1);
2032 assert_eq!(ids_b.len(), 1);
2033
2034 let barrier = Arc::new(Barrier::new(2));
2035 let m1 = mapper.clone();
2036 let m2 = mapper.clone();
2037 let b1 = barrier.clone();
2038 let b2 = barrier.clone();
2039 let id_a = ids_a[0];
2040 let id_b = ids_b[0];
2041
2042 let h1 = thread::spawn(move || {
2043 b1.wait();
2044 m1.activate(id_a)
2045 });
2046 let h2 = thread::spawn(move || {
2047 b2.wait();
2048 m2.activate(id_b)
2049 });
2050 let r1 = h1.join().expect("thread A panicked");
2051 let r2 = h2.join().expect("thread B panicked");
2052
2053 // Exactly one must succeed and one must reject with
2054 // AtMaxShards.
2055 let ok_count = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2056 let at_max_count = [&r1, &r2]
2057 .iter()
2058 .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
2059 .count();
2060 assert_eq!(
2061 ok_count, 1,
2062 "iter {}: expected exactly one Ok, got r1={:?} r2={:?}",
2063 iter, r1, r2
2064 );
2065 assert_eq!(
2066 at_max_count, 1,
2067 "iter {}: expected exactly one AtMaxShards, got r1={:?} r2={:?}",
2068 iter, r1, r2
2069 );
2070
2071 // active_count must not exceed max_shards at any
2072 // observation point.
2073 assert!(
2074 mapper.active_shard_count() <= 4,
2075 "iter {}: active_count={} exceeded max_shards=4",
2076 iter,
2077 mapper.active_shard_count(),
2078 );
2079 }
2080 }
2081
2082 /// Two concurrent `scale_up(1)` calls must never both succeed
2083 /// inside a single cooldown window. Before the fix, the
2084 /// cooldown check happened only under a read lock that was
2085 /// released *before* the mutating write lock, so two threads
2086 /// could both observe `last_scaling=None` (or stale), both
2087 /// acquire the write lock in turn, and both succeed — racing
2088 /// past the cooldown floor and (on a max-shard-bounded
2089 /// scenario) potentially the `max_shards` cap.
2090 ///
2091 /// Pin: across `ITERATIONS` rounds of two-thread races, every
2092 /// iteration sees exactly one success and one `InCooldown`.
2093 #[test]
2094 fn cooldown_is_enforced_under_concurrent_scale_up() {
2095 use std::sync::Arc;
2096 use std::sync::Barrier;
2097 use std::thread;
2098
2099 const ITERATIONS: usize = 1_000;
2100
2101 for iter in 0..ITERATIONS {
2102 let policy = ScalingPolicy {
2103 min_shards: 1,
2104 max_shards: 16,
2105 // Large cooldown so a single iteration can't pass
2106 // the floor between the two threads' calls.
2107 cooldown: Duration::from_secs(60),
2108 ..Default::default()
2109 };
2110 let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2111 let barrier = Arc::new(Barrier::new(2));
2112
2113 let m1 = mapper.clone();
2114 let b1 = barrier.clone();
2115 let m2 = mapper.clone();
2116 let b2 = barrier.clone();
2117
2118 let h1 = thread::spawn(move || {
2119 b1.wait();
2120 m1.scale_up(1)
2121 });
2122 let h2 = thread::spawn(move || {
2123 b2.wait();
2124 m2.scale_up(1)
2125 });
2126
2127 let r1 = h1.join().unwrap();
2128 let r2 = h2.join().unwrap();
2129
2130 let oks = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
2131 let cooldowns = [&r1, &r2]
2132 .iter()
2133 .filter(|r| matches!(r, Err(ScalingError::InCooldown)))
2134 .count();
2135
2136 assert_eq!(
2137 oks, 1,
2138 "iter {iter}: expected exactly one Ok, got r1={r1:?}, r2={r2:?}"
2139 );
2140 assert_eq!(
2141 cooldowns, 1,
2142 "iter {iter}: expected exactly one InCooldown, got r1={r1:?}, r2={r2:?}"
2143 );
2144 assert_eq!(
2145 mapper.active_shard_count(),
2146 3,
2147 "iter {iter}: cooldown violated — both calls mutated state (shard count {})",
2148 mapper.active_shard_count()
2149 );
2150 }
2151 }
2152
2153 #[test]
2154 fn test_scale_up_overflow_protection() {
2155 // Create mapper with a high starting shard ID to test overflow
2156 // protection. The monotonic id allocator advances by 1 per
2157 // shard regardless of which shards have been removed, so we
2158 // bump `next_shard_id` directly to simulate a near-`u16::MAX`
2159 // state.
2160 let policy = ScalingPolicy {
2161 max_shards: u16::MAX,
2162 ..Default::default()
2163 };
2164 let mapper = ShardMapper::new(1, 1024, policy).unwrap();
2165
2166 // Position the allocator so the next id is u16::MAX - 1.
2167 // Trying to allocate 3 ids would need {MAX-1, MAX, MAX+1};
2168 // the last is unrepresentable in `u16`, so scale_up rejects.
2169 mapper
2170 .next_shard_id
2171 .store(u16::MAX - 1, AtomicOrdering::Relaxed);
2172
2173 let result = mapper.scale_up(3);
2174 assert!(matches!(result, Err(ScalingError::AtMaxShards)));
2175
2176 // Adding 1 shard should still work (id = MAX - 1).
2177 let result = mapper.scale_up(1);
2178 assert!(result.is_ok());
2179 }
2180
2181 #[test]
2182 fn test_evaluate_scaling_auto_scale_disabled() {
2183 let policy = ScalingPolicy {
2184 auto_scale: false,
2185 ..Default::default()
2186 };
2187 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2188
2189 let decision = mapper.evaluate_scaling();
2190 assert!(matches!(decision, ScalingDecision::None));
2191 }
2192
2193 #[test]
2194 fn test_evaluate_scaling_in_cooldown() {
2195 let policy = ScalingPolicy {
2196 cooldown: Duration::from_secs(60),
2197 ..Default::default()
2198 };
2199 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2200
2201 // Trigger a scaling operation to start cooldown
2202 *mapper.last_scaling.write() = Some(Instant::now());
2203
2204 let decision = mapper.evaluate_scaling();
2205 assert!(matches!(decision, ScalingDecision::None));
2206 }
2207
2208 #[test]
2209 fn test_evaluate_scaling_scale_up_on_high_fill_ratio() {
2210 let policy = ScalingPolicy {
2211 fill_ratio_threshold: 0.7,
2212 max_shards: 8,
2213 cooldown: Duration::from_nanos(1),
2214 ..Default::default()
2215 };
2216 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2217
2218 // Set high fill ratio on majority of shards
2219 {
2220 let mut shards = mapper.shards.write();
2221 for shard in shards.iter_mut() {
2222 shard.last_metrics.fill_ratio = 0.9; // Above threshold
2223 }
2224 }
2225
2226 let decision = mapper.evaluate_scaling();
2227 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2228 }
2229
2230 #[test]
2231 fn test_evaluate_scaling_scale_up_on_high_latency() {
2232 let policy = ScalingPolicy {
2233 push_latency_threshold_ns: 10,
2234 max_shards: 8,
2235 cooldown: Duration::from_nanos(1),
2236 ..Default::default()
2237 };
2238 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2239
2240 // Set high push latency on majority of shards
2241 {
2242 let mut shards = mapper.shards.write();
2243 for shard in shards.iter_mut() {
2244 shard.last_metrics.avg_push_latency_ns = 100; // Above threshold
2245 }
2246 }
2247
2248 let decision = mapper.evaluate_scaling();
2249 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2250 }
2251
2252 #[test]
2253 fn test_evaluate_scaling_scale_down_on_underutilized() {
2254 let policy = ScalingPolicy {
2255 underutilized_threshold: 0.2,
2256 min_shards: 2,
2257 cooldown: Duration::from_nanos(1),
2258 ..Default::default()
2259 };
2260 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2261
2262 // Set low fill ratio and zero event rate on majority of shards
2263 {
2264 let mut shards = mapper.shards.write();
2265 for shard in shards.iter_mut() {
2266 shard.last_metrics.fill_ratio = 0.05; // Below threshold
2267 shard.last_metrics.event_rate = 0;
2268 }
2269 }
2270
2271 let decision = mapper.evaluate_scaling();
2272 assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
2273 }
2274
2275 /// Regression: a freshly-activated shard must NOT
2276 /// immediately count toward the underutilized tally on the
2277 /// next `evaluate_scaling`. Pre-fix the new shard's
2278 /// `last_metrics` was the `ShardMetrics::new(id)` placeholder
2279 /// (`fill_ratio = 0.0, event_rate = 0`), which matched the
2280 /// underutilized trigger immediately — oscillating the
2281 /// system: scale-up → next tick scale-down → next tick
2282 /// scale-up.
2283 ///
2284 /// Post-fix `MappedShard.activated_at` is stamped at the
2285 /// Provisioning → Active transition (and at scale-up
2286 /// construction for `Active`-from-create shards), and
2287 /// `evaluate_scaling` skips shards within `policy.cooldown`
2288 /// of activation.
2289 #[test]
2290 fn freshly_added_shard_skipped_from_evaluate_scaling_warmup() {
2291 // Short cooldown so we can pin "outside warmup" with a
2292 // small `checked_sub`-safe offset (200ms). Production
2293 // stamps boot shards 1 hour in the past, but that
2294 // subtraction falls back to `Instant::now()` on hosts
2295 // with sub-1h uptime (Windows Instant is bounded by
2296 // boot), making all shards land inside the warmup
2297 // window. We override both boot shards explicitly so
2298 // the test is independent of system uptime.
2299 let policy = ScalingPolicy {
2300 underutilized_threshold: 0.2,
2301 min_shards: 1,
2302 cooldown: Duration::from_millis(100),
2303 ..Default::default()
2304 };
2305 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2306
2307 // Direct manipulation of the shard list: pin two boot
2308 // shards as underutilized AND safely outside the warmup
2309 // window, and inject one freshly-activated shard with
2310 // `activated_at = now()` whose placeholder metrics ALSO
2311 // trigger the underutilized predicate. Without the
2312 // warmup skip, the count would be 3 of 3 shards
2313 // underutilized → scale-down. WITH the warmup skip,
2314 // only the 2 boot shards count, and 2 of 3 still
2315 // exceeds the 3/2 = 1 majority — scale-down still fires
2316 // (driven by the boot shards), but the decisive property
2317 // is that the fresh shard is correctly excluded.
2318 let old_ts = Instant::now()
2319 .checked_sub(Duration::from_millis(200))
2320 .expect("test host uptime should exceed 200ms");
2321 {
2322 let mut shards = mapper.shards.write();
2323 for shard in shards.iter_mut() {
2324 shard.last_metrics.fill_ratio = 0.05;
2325 shard.last_metrics.event_rate = 0;
2326 // Pin boot shards safely outside the cooldown,
2327 // independent of production's 1-hour stamp.
2328 shard.activated_at = old_ts;
2329 }
2330 // The third shard is "fresh": stamp activated_at to
2331 // now, simulating a just-activated shard.
2332 shards[2].activated_at = Instant::now();
2333 }
2334
2335 // The fresh shard must satisfy the warmup predicate.
2336 let now = Instant::now();
2337 let warmup_excluded: Vec<u16> = mapper
2338 .shards
2339 .read()
2340 .iter()
2341 .filter(|s| now.duration_since(s.activated_at) < mapper.policy.cooldown)
2342 .map(|s| s.id)
2343 .collect();
2344 assert_eq!(
2345 warmup_excluded,
2346 vec![2u16],
2347 "regression: only shard id 2 (just-stamped) should be \
2348 within the warmup window; boot shards are stamped \
2349 200ms in the past (well outside the 100ms cooldown)"
2350 );
2351
2352 // And evaluate_scaling must still produce ScaleDown
2353 // (driven by the 2 boot shards) — the fresh shard's
2354 // placeholder doesn't get to vote.
2355 let decision = mapper.evaluate_scaling();
2356 assert!(
2357 matches!(decision, ScalingDecision::ScaleDown(_)),
2358 "scale-down still fires from the boot shards' real \
2359 underutilization, but driven by 2 of 3 not 3 of 3 — \
2360 got {:?}",
2361 decision,
2362 );
2363 }
2364
2365 #[test]
2366 fn test_evaluate_scaling_no_scale_up_at_max() {
2367 let policy = ScalingPolicy {
2368 fill_ratio_threshold: 0.7,
2369 max_shards: 4, // Already at max
2370 cooldown: Duration::from_nanos(1),
2371 ..Default::default()
2372 };
2373 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2374
2375 // Set high fill ratio
2376 {
2377 let mut shards = mapper.shards.write();
2378 for shard in shards.iter_mut() {
2379 shard.last_metrics.fill_ratio = 0.9;
2380 }
2381 }
2382
2383 let decision = mapper.evaluate_scaling();
2384 assert!(matches!(decision, ScalingDecision::None));
2385 }
2386
2387 #[test]
2388 fn test_evaluate_scaling_no_scale_down_at_min() {
2389 let policy = ScalingPolicy {
2390 underutilized_threshold: 0.2,
2391 min_shards: 4, // Already at min
2392 cooldown: Duration::from_nanos(1),
2393 ..Default::default()
2394 };
2395 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2396
2397 // Set underutilized metrics
2398 {
2399 let mut shards = mapper.shards.write();
2400 for shard in shards.iter_mut() {
2401 shard.last_metrics.fill_ratio = 0.05;
2402 shard.last_metrics.event_rate = 0;
2403 }
2404 }
2405
2406 let decision = mapper.evaluate_scaling();
2407 assert!(matches!(decision, ScalingDecision::None));
2408 }
2409
2410 #[test]
2411 fn test_evaluate_scaling_ignores_draining_shards() {
2412 let policy = ScalingPolicy {
2413 fill_ratio_threshold: 0.7,
2414 max_shards: 8,
2415 cooldown: Duration::from_nanos(1),
2416 ..Default::default()
2417 };
2418 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2419
2420 // Set high fill ratio but mark shards as draining
2421 {
2422 let mut shards = mapper.shards.write();
2423 for shard in shards.iter_mut() {
2424 shard.last_metrics.fill_ratio = 0.9;
2425 shard.state = ShardState::Draining;
2426 }
2427 }
2428
2429 // Should not scale up since draining shards are ignored
2430 let decision = mapper.evaluate_scaling();
2431 assert!(matches!(decision, ScalingDecision::None));
2432 }
2433
2434 #[test]
2435 fn test_shard_metrics_new() {
2436 let metrics = ShardMetrics::new(5);
2437 assert_eq!(metrics.shard_id, 5);
2438 assert_eq!(metrics.fill_ratio, 0.0);
2439 assert_eq!(metrics.event_rate, 0);
2440 assert!(!metrics.draining);
2441 }
2442
2443 #[test]
2444 fn test_shard_metrics_compute_weight() {
2445 let mut metrics = ShardMetrics::new(0);
2446 metrics.fill_ratio = 0.5;
2447 metrics.avg_push_latency_ns = 100;
2448 metrics.event_rate = 1_000_000;
2449
2450 metrics.compute_weight();
2451 assert!(metrics.weight > 0.0);
2452 }
2453
2454 #[test]
2455 fn test_shard_metrics_draining_max_weight() {
2456 let mut metrics = ShardMetrics::new(0);
2457 metrics.draining = true;
2458 metrics.compute_weight();
2459 assert_eq!(metrics.weight, f64::MAX);
2460 }
2461
2462 #[test]
2463 fn test_scaling_decision_debug() {
2464 let none = ScalingDecision::None;
2465 let up = ScalingDecision::ScaleUp(2);
2466 let down = ScalingDecision::ScaleDown(1);
2467
2468 assert!(format!("{:?}", none).contains("None"));
2469 assert!(format!("{:?}", up).contains("ScaleUp"));
2470 assert!(format!("{:?}", down).contains("ScaleDown"));
2471 }
2472
2473 /// Regression: BUG_REPORT.md #46 — `scale_up_provisioning`
2474 /// allocates a shard but `select_shard` must NOT route to it
2475 /// until `activate` has been called. This is the load-bearing
2476 /// invariant that lets `EventBus::add_shard_internal` wire up
2477 /// drain workers before producers can land in the new ring
2478 /// buffer.
2479 #[test]
2480 fn provisioning_shard_is_not_selectable_until_activated() {
2481 let policy = ScalingPolicy {
2482 min_shards: 1,
2483 max_shards: 16,
2484 cooldown: Duration::from_nanos(1),
2485 ..Default::default()
2486 };
2487 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2488
2489 // Allocate a provisioning shard. Existing ids are 0,1; new
2490 // is 2.
2491 let new_ids = mapper.scale_up_provisioning(1).unwrap();
2492 assert_eq!(new_ids, vec![2]);
2493
2494 // The provisioning shard must NOT appear in active accounting.
2495 assert_eq!(mapper.active_shard_count(), 2);
2496 assert_eq!(mapper.shard_state(2), Some(ShardState::Provisioning));
2497
2498 // Across many hashes, `select_shard` must never pick id 2.
2499 // Spread the input hashes across the u64 range so the
2500 // unbiased Lemire-style mapping actually
2501 // distributes — sequential small ids would all map to
2502 // index 0 because `(small * len) >> 64 = 0`. Production
2503 // callers pass `xxh3_64`-hashed event payloads, which
2504 // are uniform u64s; this scaling mirrors that.
2505 let mut seen: std::collections::HashSet<u16> = std::collections::HashSet::new();
2506 let stride = u64::MAX / 10_000;
2507 for i in 0u64..10_000 {
2508 seen.insert(mapper.select_shard(i.wrapping_mul(stride)));
2509 }
2510 assert!(
2511 !seen.contains(&2),
2512 "provisioning shard 2 was selected — \
2513 producers would push into a ring buffer with no consumer (#46)"
2514 );
2515 assert!(seen == [0, 1].into_iter().collect());
2516
2517 // After `activate`, the shard is selectable.
2518 mapper.activate(2).unwrap();
2519 assert_eq!(mapper.shard_state(2), Some(ShardState::Active));
2520 assert_eq!(mapper.active_shard_count(), 3);
2521
2522 let mut seen_after: std::collections::HashSet<u16> = std::collections::HashSet::new();
2523 let stride = u64::MAX / 10_000;
2524 for i in 0u64..10_000 {
2525 seen_after.insert(mapper.select_shard(i.wrapping_mul(stride)));
2526 }
2527 assert!(
2528 seen_after.contains(&2),
2529 "after activate, shard 2 should be a valid select_shard target"
2530 );
2531 }
2532
2533 /// Regression: BUG_REPORT.md #51 — when no `Active` shard exists
2534 /// (e.g. all shards are Draining or Provisioning), `select_shard`
2535 /// must NOT fall back to a Draining shard. Pushing into a
2536 /// draining shard increments `pushes_since_drain_start` and
2537 /// blocks finalization indefinitely.
2538 #[test]
2539 fn select_shard_does_not_fall_back_to_draining() {
2540 let policy = ScalingPolicy {
2541 min_shards: 0,
2542 max_shards: 8,
2543 cooldown: Duration::from_nanos(1),
2544 ..Default::default()
2545 };
2546 // Force min_shards = 0 via direct construction so we can drain
2547 // every shard.
2548 let mut policy = policy;
2549 policy.min_shards = 1;
2550 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2551
2552 // Force every shard into Draining state directly. We bypass
2553 // `scale_down`'s min_shards floor by mutating the test-only
2554 // accessor; this models a state that can otherwise be
2555 // reached by `drain_specific` calls.
2556 //
2557 // Backdoor mutations don't trigger the `select_shard`
2558 // selection-table rebuild that the public API does, so we
2559 // call it explicitly here to model what `drain_specific`
2560 // would have done.
2561 {
2562 let mut shards = mapper.shards.write();
2563 for s in shards.iter_mut() {
2564 s.state = ShardState::Draining;
2565 s.metrics.set_draining(true);
2566 }
2567 mapper.rebuild_selection_table_locked(&shards);
2568 }
2569
2570 // The fallback must return the OOB sentinel `u16::MAX` rather
2571 // than a draining shard id (0 or 1). The upstream
2572 // `resolve_idx` path will see no match for `u16::MAX` and
2573 // surface as `Unrouted` (#44), which is the correct signal:
2574 // "no destination, do not push."
2575 for hash in 0u64..1_000 {
2576 let picked = mapper.select_shard(hash);
2577 assert_eq!(
2578 picked,
2579 u16::MAX,
2580 "fallback returned a draining shard ({}); pushes there would \
2581 deadlock finalize_draining (#51)",
2582 picked
2583 );
2584 }
2585 }
2586
2587 /// Regression: BUG_REPORT.md #32 — `scale_up(0)` previously
2588 /// bumped the cooldown timestamp and could spuriously fail at
2589 /// `u16::MAX` even though zero ids were being allocated.
2590 #[test]
2591 fn scale_up_zero_is_a_noop() {
2592 let policy = ScalingPolicy {
2593 cooldown: Duration::from_secs(60),
2594 ..Default::default()
2595 };
2596 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2597 // Pretend we just scaled — cooldown is active.
2598 *mapper.last_scaling.write() = Some(Instant::now());
2599
2600 // scale_up(0) must not return InCooldown and must not bump
2601 // last_scaling.
2602 let before_ts = *mapper.last_scaling.read();
2603 let r = mapper.scale_up(0);
2604 assert!(
2605 r.is_ok(),
2606 "scale_up(0) should succeed as a no-op, got {r:?}"
2607 );
2608 assert_eq!(r.unwrap().len(), 0);
2609 let after_ts = *mapper.last_scaling.read();
2610 assert_eq!(before_ts, after_ts, "scale_up(0) bumped cooldown timestamp");
2611
2612 // Also: position the allocator at u16::MAX and verify a
2613 // count==0 call doesn't trip the sentinel check.
2614 mapper
2615 .next_shard_id
2616 .store(u16::MAX, AtomicOrdering::Relaxed);
2617 assert!(mapper.scale_up(0).is_ok());
2618 }
2619
2620 /// Regression: `drain_specific` must bump `last_scaling` so
2621 /// a subsequent `scale_up` is gated by the cooldown floor.
2622 /// Pre-fix `scale_down` wrote `last_scaling` but
2623 /// `drain_specific` did not, so the sequence
2624 /// `drain_specific(id) → scale_up(N)` bypassed the cooldown
2625 /// — even though both decrement `active_count` and so
2626 /// should be symmetric from the budget-math perspective.
2627 #[test]
2628 fn drain_specific_bumps_last_scaling_so_scale_up_respects_cooldown() {
2629 let policy = ScalingPolicy {
2630 min_shards: 1,
2631 max_shards: 8,
2632 // A cooldown long enough that `Instant::now()` won't
2633 // accidentally elapse it during the test, but short
2634 // enough not to slow CI.
2635 cooldown: Duration::from_secs(60),
2636 ..Default::default()
2637 };
2638 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2639
2640 // Pre-condition: no prior scaling action.
2641 assert!(mapper.last_scaling.read().is_none());
2642
2643 let before = Instant::now();
2644 mapper.drain_specific(0).unwrap();
2645 let after_ts = mapper
2646 .last_scaling
2647 .read()
2648 .expect("drain_specific must record a `last_scaling` timestamp");
2649 // Sanity: the recorded timestamp is in the test window.
2650 assert!(
2651 after_ts >= before,
2652 "last_scaling must be bumped to a current Instant (got {:?}, before was {:?})",
2653 after_ts,
2654 before
2655 );
2656
2657 // The decisive sealed property: a follow-up scale_up
2658 // must trip the cooldown gate. Pre-fix this would have
2659 // succeeded immediately because `last_scaling` was never
2660 // written by `drain_specific`.
2661 let err = mapper
2662 .scale_up(1)
2663 .expect_err("scale_up immediately after drain_specific must hit cooldown");
2664 match err {
2665 ScalingError::InCooldown => {} // expected
2666 other => panic!("expected InCooldown after drain_specific, got {:?}", other),
2667 }
2668 }
2669
2670 /// Regression: BUG_REPORT.md #48 — `drain_specific` must
2671 /// transition the shard's `MappedShard.state` to `Draining` so
2672 /// that `select_shard` stops routing to it. The previous
2673 /// `drain_shard` only flipped a metrics atomic.
2674 #[test]
2675 fn drain_specific_takes_shard_out_of_select() {
2676 let policy = ScalingPolicy {
2677 min_shards: 1,
2678 max_shards: 8,
2679 cooldown: Duration::from_nanos(1),
2680 ..Default::default()
2681 };
2682 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2683
2684 // Drain shard 0.
2685 mapper.drain_specific(0).unwrap();
2686 assert_eq!(mapper.shard_state(0), Some(ShardState::Draining));
2687 assert_eq!(mapper.active_shard_count(), 2);
2688
2689 // Across many hashes, select_shard must not pick id 0.
2690 for hash in 0u64..10_000 {
2691 let picked = mapper.select_shard(hash);
2692 assert_ne!(
2693 picked, 0,
2694 "select_shard returned a Draining shard id 0 — \
2695 producer pushes there would block finalize_draining (#48)"
2696 );
2697 }
2698 }
2699
2700 /// Regression: BUG_REPORT.md #33 — `set_draining(true)` and a
2701 /// concurrent `record_push` race on `pushes_since_drain_start`.
2702 /// The race itself can't be eliminated without a CAS loop on
2703 /// the counter (which would penalize the hot path), so the
2704 /// best we can pin is: after the dust settles, the counter
2705 /// value is bounded — never larger than the number of pushes
2706 /// that genuinely overlapped the transition. This catches a
2707 /// regression where the store-zero stops happening, where the
2708 /// flag publish stops happening, or where future code adds
2709 /// drift that compounds the race across many transitions.
2710 #[test]
2711 fn set_draining_resets_counter_under_concurrent_pushes() {
2712 use std::sync::Barrier;
2713 use std::thread;
2714
2715 const ITERATIONS: usize = 200;
2716 const PUSHERS: usize = 4;
2717
2718 for _ in 0..ITERATIONS {
2719 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
2720
2721 // Sanity: counter starts at zero.
2722 assert_eq!(collector.pushes_since_drain_start(), 0);
2723
2724 // Pre-load with a noticeable amount of "before drain"
2725 // pushes so the reset has work to do.
2726 for _ in 0..50 {
2727 collector.record_push(1);
2728 }
2729 assert_eq!(collector.pushes_since_drain_start(), 50);
2730
2731 // Race: every pusher hammers `record_push` while one
2732 // thread calls `set_draining(true)`. After the barrier,
2733 // we want to observe that the counter ends up bounded
2734 // by PUSHERS (the number of pushes that genuinely
2735 // overlapped the transition) — and crucially NOT 50+,
2736 // which is what the buggy code with a missing reset
2737 // would leave behind.
2738 let barrier = Arc::new(Barrier::new(PUSHERS + 1));
2739 let mut handles = Vec::with_capacity(PUSHERS);
2740 for _ in 0..PUSHERS {
2741 let c = Arc::clone(&collector);
2742 let b = Arc::clone(&barrier);
2743 handles.push(thread::spawn(move || {
2744 b.wait();
2745 c.record_push(1);
2746 }));
2747 }
2748
2749 barrier.wait();
2750 collector.set_draining(true);
2751 for h in handles {
2752 h.join().unwrap();
2753 }
2754
2755 // After reset + at-most-PUSHERS racing pushes, the
2756 // counter is bounded. The strong guarantee we want is
2757 // "the 50 pre-drain pushes did NOT survive the reset."
2758 // Anything <= PUSHERS is acceptable — the race may
2759 // count any subset of the racing pushes.
2760 let final_count = collector.pushes_since_drain_start();
2761 assert!(
2762 final_count <= PUSHERS as u64,
2763 "set_draining reset is broken: counter is {} after reset, \
2764 expected at most {} (#33)",
2765 final_count,
2766 PUSHERS
2767 );
2768 }
2769 }
2770
2771 /// Regression: BUG_REPORT.md #49 — `finalize_draining` must
2772 /// drop the `shards` write lock before calling `on_shard_removed`,
2773 /// so a callback that re-enters the mapper (read methods like
2774 /// `shard_state`, `active_shard_ids`, etc.) does not deadlock.
2775 #[test]
2776 fn finalize_draining_does_not_deadlock_on_callback_reentry() {
2777 use std::sync::Mutex;
2778 let policy = ScalingPolicy {
2779 min_shards: 1,
2780 max_shards: 8,
2781 cooldown: Duration::from_nanos(1),
2782 ..Default::default()
2783 };
2784 let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2785
2786 // Set a callback that re-enters the mapper. Before the fix
2787 // this acquires `shards.read()` while finalize_draining
2788 // still holds `shards.write()`, deadlocking on parking_lot's
2789 // non-recursive RwLock.
2790 type Observation = (u16, Option<ShardState>);
2791 let observed_states: Arc<Mutex<Vec<Observation>>> = Arc::new(Mutex::new(Vec::new()));
2792 {
2793 let mapper_for_cb = Arc::clone(&mapper);
2794 let observed = Arc::clone(&observed_states);
2795 mapper.set_on_shard_removed(move |id| {
2796 let st = mapper_for_cb.shard_state(id);
2797 observed.lock().unwrap().push((id, st));
2798 });
2799 }
2800
2801 // Drive a shard all the way to Stopped: drain it, mark its
2802 // metrics empty (current_len = 0), and drop drain_started
2803 // far enough back that the 100ms gate is satisfied.
2804 mapper.drain_specific(0).unwrap();
2805 {
2806 let mut shards = mapper.shards.write();
2807 let s = shards.iter_mut().find(|s| s.id == 0).unwrap();
2808 s.metrics.current_len.store(0, AtomicOrdering::Relaxed);
2809 s.metrics
2810 .pushes_since_drain_start
2811 .store(0, AtomicOrdering::Relaxed);
2812 // Backdate drain_started so the elapsed > 100ms gate trips.
2813 s.drain_started = Some(Instant::now() - Duration::from_secs(1));
2814 }
2815
2816 // The call below would deadlock with the bug present.
2817 let stopped = mapper.finalize_draining();
2818 assert_eq!(stopped, vec![0]);
2819
2820 // The callback must have run AND been able to read state
2821 // (proving the lock was released).
2822 let observed = observed_states.lock().unwrap().clone();
2823 assert_eq!(observed.len(), 1);
2824 assert_eq!(observed[0].0, 0);
2825 // State should be `Stopped` (set by finalize_draining before
2826 // the lock was dropped).
2827 assert_eq!(observed[0].1, Some(ShardState::Stopped));
2828 }
2829
2830 /// `activate` must signal whether a state transition
2831 /// actually occurred so callers can avoid double-counting.
2832 /// Pre-fix it returned `Result<(), _>`, so a caller that
2833 /// invoked `activate` twice on the same shard incremented its
2834 /// own external counter (e.g. `ShardManager::num_shards`)
2835 /// twice for one logical transition.
2836 #[test]
2837 fn activate_returns_true_on_transition_and_false_on_idempotent_call() {
2838 let policy = ScalingPolicy {
2839 min_shards: 1,
2840 max_shards: 16,
2841 cooldown: Duration::from_nanos(1),
2842 ..Default::default()
2843 };
2844 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2845
2846 // Newly-provisioned shard 2 → Active: real transition.
2847 let new_ids = mapper.scale_up_provisioning(1).unwrap();
2848 assert_eq!(new_ids, vec![2]);
2849 let first = mapper.activate(2).unwrap();
2850 assert!(
2851 first,
2852 "first activate on a Provisioning shard must return true"
2853 );
2854 assert_eq!(mapper.active_shard_count(), 3);
2855
2856 // Second activate on the same already-Active shard:
2857 // idempotent, no transition.
2858 let second = mapper.activate(2).unwrap();
2859 assert!(
2860 !second,
2861 "activate on an already-Active shard must return false; \
2862 pre-fix this returned Ok(()) and the caller couldn't tell"
2863 );
2864 // active_count must NOT have moved.
2865 assert_eq!(
2866 mapper.active_shard_count(),
2867 3,
2868 "idempotent activate must not bump active_count"
2869 );
2870 }
2871}