net/shard/mapper.rs
1//! Dynamic shard mapping and scaling.
2//!
3//! This module implements dynamic shard scaling following Approach B from
4//! DYNAMIC_SHARD_SCALING.md: increasing the number of shards and rebalancing
5//! producers across them, while maintaining SPSC (single-producer single-consumer)
6//! semantics per shard.
7//!
8//! # Core Principles
9//!
10//! - Shards never accept multiple producers
11//! - Producers get moved to new shards when load increases
12//! - Ingestion performance stays at 700M ops/sec per shard
13//! - Ordering guarantees remain intact within a shard
14//! - Total throughput scales linearly with shard count
15//!
16//! # Scaling Triggers
17//!
18//! - Ring buffer fill ratio > threshold (default 70%)
19//! - Push latency exceeds threshold (default 5ns)
20//! - Batch flush latency exceeds threshold
21//! - Session/producer count growth
22//!
23//! # Architecture
24//!
25//! ```text
26//! Producers -----+
27//! |
28//! v
29//! +----------------------------+
30//! | Dynamic Shard Mapper |
31//! +----------------------------+
32//! | | |
33//! v v v
34//! Shard 0 Shard 1 Shard 2 …
35//! ```
36
37use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU64, Ordering as AtomicOrdering};
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40
41use parking_lot::RwLock;
42
43use crate::config::ScalingPolicy;
44
45/// Metrics for a single shard used for scaling decisions.
46#[derive(Debug, Clone)]
47pub struct ShardMetrics {
48 /// Shard identifier.
49 pub shard_id: u16,
50 /// Current fill ratio (0.0 - 1.0).
51 pub fill_ratio: f64,
52 /// Events ingested in the current window.
53 pub event_rate: u64,
54 /// Average push latency in nanoseconds.
55 pub avg_push_latency_ns: u64,
56 /// Average batch flush latency in microseconds.
57 pub avg_flush_latency_us: u64,
58 /// Whether this shard is in drain mode.
59 pub draining: bool,
60 /// Computed weight for load balancing (lower = less loaded).
61 pub weight: f64,
62 /// Last update timestamp.
63 pub last_updated: Instant,
64}
65
66impl ShardMetrics {
67 /// Create new metrics for a shard.
68 pub fn new(shard_id: u16) -> Self {
69 Self {
70 shard_id,
71 fill_ratio: 0.0,
72 event_rate: 0,
73 avg_push_latency_ns: 0,
74 avg_flush_latency_us: 0,
75 draining: false,
76 weight: 0.0,
77 last_updated: Instant::now(),
78 }
79 }
80
81 /// Compute the weight based on current metrics.
82 /// Lower weight = better candidate for new producers.
83 pub fn compute_weight(&mut self) {
84 // Weight formula: combines fill ratio, latency, and event rate
85 // Higher fill ratio = higher weight (avoid overloaded shards)
86 // Higher latency = higher weight
87 // Higher event rate = higher weight
88 let fill_weight = self.fill_ratio * 100.0;
89 let latency_weight = (self.avg_push_latency_ns as f64) / 10.0;
90 let rate_weight = (self.event_rate as f64) / 1_000_000.0;
91
92 self.weight = fill_weight + latency_weight + rate_weight;
93
94 // Draining shards get maximum weight (never assign new producers)
95 if self.draining {
96 self.weight = f64::MAX;
97 }
98 }
99}
100
101/// Live metrics collector for a shard (atomics for lock-free updates).
102#[derive(Debug)]
103pub struct ShardMetricsCollector {
104 /// Shard identifier.
105 shard_id: u16,
106 /// Ring buffer capacity.
107 capacity: usize,
108 /// Current buffer length (updated by shard).
109 current_len: AtomicU64,
110 /// Events ingested in current window.
111 events_in_window: AtomicU64,
112 /// Packed `(count << 32) | sum` for push latencies (ns).
113 /// Pre-fix `push_latency_sum_ns` and `push_count` were
114 /// independent `AtomicU64`s. `record_push` did two separate
115 /// `fetch_add`s, and `collect_and_reset` did two separate
116 /// `swap`s. A metrics tick interleaving between the two
117 /// `fetch_add`s captured the sum WITHOUT the count (or
118 /// vice versa); the resulting `avg = sum.checked_div(count)
119 /// .unwrap_or(0)` returned 0 in window N (sum without
120 /// count) and 0 in window N+1 (count without sum), silently
121 /// zeroing the average that drives `evaluate_scaling`'s
122 /// push-latency scale-up trigger. Packing into one u64 makes
123 /// the `(sum, count)` update atomic; the upper 32 bits hold
124 /// the count (u32::MAX = 4G calls/window — plenty) and the
125 /// lower 32 hold the sum (u32::MAX = 4 G ns ≈ 4 s, also
126 /// plenty for any sane window).
127 push_latency: AtomicU64,
128 /// Packed `(count << 32) | sum` for flush latencies (us).
129 /// Same shape and rationale as `push_latency`. The lower 32
130 /// bits hold sum-µs (u32::MAX ≈ 4 Gµs ≈ 67 minutes — far
131 /// past any plausible window).
132 flush_latency: AtomicU64,
133 /// Whether this shard is draining.
134 draining: AtomicBool,
135 /// Window start time.
136 window_start: RwLock<Instant>,
137 /// Pushes observed since `set_draining(true)` was last called.
138 /// Distinct from `events_in_window` because this counter is NOT
139 /// reset by `collect_and_reset`. `finalize_draining` reads this
140 /// instead of `events_in_window` so a drain-window-overlap with
141 /// a metrics tick can no longer race the counter to zero before
142 /// the producer is observed.
143 pushes_since_drain_start: AtomicU64,
144}
145
146impl ShardMetricsCollector {
147 /// Create a new metrics collector.
148 pub fn new(shard_id: u16, capacity: usize) -> Self {
149 Self {
150 shard_id,
151 capacity,
152 current_len: AtomicU64::new(0),
153 events_in_window: AtomicU64::new(0),
154 push_latency: AtomicU64::new(0),
155 flush_latency: AtomicU64::new(0),
156 draining: AtomicBool::new(false),
157 window_start: RwLock::new(Instant::now()),
158 pushes_since_drain_start: AtomicU64::new(0),
159 }
160 }
161
162 /// Record current buffer length.
163 #[inline]
164 pub fn record_buffer_len(&self, len: usize) {
165 self.current_len.store(len as u64, AtomicOrdering::Relaxed);
166 }
167
168 /// Record an event ingestion.
169 #[inline]
170 pub fn record_push(&self, latency_ns: u64) {
171 self.events_in_window.fetch_add(1, AtomicOrdering::Relaxed);
172 // Atomically add 1 to count (upper 32 bits) and
173 // `latency_ns` to sum (lower 32 bits). `fetch_update`
174 // CAS-loops the load-and-store, so a concurrent
175 // `collect_and_reset` swap on the same word either sees
176 // both pre-add or both post-add — no `(sum, count)`
177 // desync. Saturating ops cap at u32::MAX inside the
178 // pack window (~4 G calls / 4 s of accumulated latency),
179 // which is far beyond any sane metrics tick.
180 let _ =
181 self.push_latency
182 .fetch_update(AtomicOrdering::Relaxed, AtomicOrdering::Relaxed, |v| {
183 let count = (v >> 32) as u32;
184 let sum = (v & 0xFFFF_FFFF) as u32;
185 let new_count = count.saturating_add(1) as u64;
186 let new_sum = sum.saturating_add(latency_ns.min(u32::MAX as u64) as u32) as u64;
187 Some((new_count << 32) | new_sum)
188 });
189 // Always increment — the cost is one fetch_add and the
190 // counter only matters when the shard is draining. Cheaper
191 // than branching on `self.draining.load()` in the hot path.
192 self.pushes_since_drain_start
193 .fetch_add(1, AtomicOrdering::Relaxed);
194 }
195
196 /// Record a batch flush.
197 #[inline]
198 pub fn record_flush(&self, latency_us: u64) {
199 // Same packed-`(count, sum)` shape as `record_push` —
200 // see that function for the desync rationale.
201 let _ = self.flush_latency.fetch_update(
202 AtomicOrdering::Relaxed,
203 AtomicOrdering::Relaxed,
204 |v| {
205 let count = (v >> 32) as u32;
206 let sum = (v & 0xFFFF_FFFF) as u32;
207 let new_count = count.saturating_add(1) as u64;
208 let new_sum = sum.saturating_add(latency_us.min(u32::MAX as u64) as u32) as u64;
209 Some((new_count << 32) | new_sum)
210 },
211 );
212 }
213
214 /// Set drain mode.
215 ///
216 /// Transitions to draining reset `pushes_since_drain_start` so
217 /// `finalize_draining` only counts pushes that arrived after the
218 /// drain began.
219 ///
220 /// A concurrent `record_push` can interleave with the store-zero
221 /// on `pushes_since_drain_start` and leave the counter at `1`
222 /// rather than `0`. That's not a correctness bug — it just defers
223 /// finalization of this shard by one metrics tick — but the
224 /// previous code did the store-zero on the counter *before*
225 /// publishing the `draining=true` flag, which made the window
226 /// slightly larger than necessary. Publishing the flag first
227 /// means any push that *observes* `draining=true` is naturally
228 /// sequenced after the reset; pushes that beat the flag publish
229 /// race the reset just like before.
230 ///
231 /// We also use `SeqCst` for the publish to give the rest of the
232 /// crate a single total order on draining transitions, which
233 /// matches the ordering on `try_enter_ingest`'s shutdown flag.
234 pub fn set_draining(&self, draining: bool) {
235 if draining {
236 // Store-zero first (so the flag publish below acts as
237 // the release-fence for both writes).
238 self.pushes_since_drain_start
239 .store(0, AtomicOrdering::SeqCst);
240 }
241 self.draining.store(draining, AtomicOrdering::SeqCst);
242 }
243
244 /// Number of pushes observed since `set_draining(true)` was
245 /// last called. Used by `finalize_draining` to detect lingering
246 /// producers that the window-reset `events_in_window` counter
247 /// can race past.
248 ///
249 /// Pre-fix used `Ordering::Relaxed`, but the writer
250 /// side (`set_draining(true)`) resets the counter under
251 /// `SeqCst`. On weakly-ordered hardware (ARM), a Relaxed
252 /// reader could observe a stale counter and `finalize_draining`
253 /// would falsely conclude the drain had flushed while
254 /// producers were still pushing. Acquire pairs with the
255 /// SeqCst release of the reset (SeqCst includes Release
256 /// semantics), making the reset happen-before this load.
257 pub fn pushes_since_drain_start(&self) -> u64 {
258 self.pushes_since_drain_start.load(AtomicOrdering::Acquire)
259 }
260
261 /// Check if draining.
262 pub fn is_draining(&self) -> bool {
263 self.draining.load(AtomicOrdering::Acquire)
264 }
265
266 /// Collect metrics and reset window counters.
267 ///
268 /// NOTE: The individual atomic swaps below are not collectively atomic with
269 /// respect to concurrent `record_push`/`record_flush` calls. This means a
270 /// push recorded between, say, the `events_in_window` swap and the
271 /// `push_count` swap could be counted in one counter but not the other for
272 /// a given window. This small inaccuracy is an accepted trade-off to
273 /// preserve the lock-free design of the hot path (`record_push` /
274 /// `record_flush`). Adding a `Mutex` here would serialize the hot path
275 /// and defeat the purpose of using atomics.
276 pub fn collect_and_reset(&self) -> ShardMetrics {
277 let current_len = self.current_len.load(AtomicOrdering::Relaxed);
278 let events = self.events_in_window.swap(0, AtomicOrdering::Relaxed);
279 // Single swap captures `(count, sum)` together; no
280 // chance of catching the sum without the matching count
281 // (or vice versa) the way two independent swaps did.
282 let push_packed = self.push_latency.swap(0, AtomicOrdering::Relaxed);
283 let push_count = push_packed >> 32;
284 let push_latency_sum = push_packed & 0xFFFF_FFFF;
285 let flush_packed = self.flush_latency.swap(0, AtomicOrdering::Relaxed);
286 let flush_count = flush_packed >> 32;
287 let flush_latency_sum = flush_packed & 0xFFFF_FFFF;
288
289 let fill_ratio = if self.capacity > 0 {
290 current_len as f64 / self.capacity as f64
291 } else {
292 0.0
293 };
294
295 let avg_push_latency = push_latency_sum.checked_div(push_count).unwrap_or(0);
296
297 let avg_flush_latency = flush_latency_sum.checked_div(flush_count).unwrap_or(0);
298
299 // Reset window
300 *self.window_start.write() = Instant::now();
301
302 let mut metrics = ShardMetrics {
303 shard_id: self.shard_id,
304 fill_ratio,
305 event_rate: events,
306 avg_push_latency_ns: avg_push_latency,
307 avg_flush_latency_us: avg_flush_latency,
308 draining: self.draining.load(AtomicOrdering::Acquire),
309 weight: 0.0,
310 last_updated: Instant::now(),
311 };
312 metrics.compute_weight();
313 metrics
314 }
315}
316
317/// Scaling decision made by the mapper.
318#[derive(Debug, Clone, PartialEq, Eq)]
319pub enum ScalingDecision {
320 /// No scaling needed.
321 None,
322 /// Scale up by adding N shards.
323 ScaleUp(u16),
324 /// Scale down by removing N shards (marks them for draining).
325 ScaleDown(u16),
326}
327
328/// Errors that can occur during scaling operations.
329#[derive(Debug, Clone)]
330pub enum ScalingError {
331 /// Invalid scaling policy.
332 InvalidPolicy(String),
333 /// Already at maximum shards.
334 AtMaxShards,
335 /// Already at minimum shards.
336 AtMinShards,
337 /// Scaling operation in cooldown.
338 InCooldown,
339 /// Shard creation failed.
340 ShardCreationFailed(String),
341}
342
343impl std::fmt::Display for ScalingError {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 match self {
346 Self::InvalidPolicy(msg) => write!(f, "invalid scaling policy: {}", msg),
347 Self::AtMaxShards => write!(f, "already at maximum shard count"),
348 Self::AtMinShards => write!(f, "already at minimum shard count"),
349 Self::InCooldown => write!(f, "scaling operation in cooldown"),
350 Self::ShardCreationFailed(msg) => write!(f, "shard creation failed: {}", msg),
351 }
352 }
353}
354
355impl std::error::Error for ScalingError {}
356
357/// State of a shard in the mapper.
358#[derive(Debug, Clone, Copy, PartialEq, Eq)]
359pub enum ShardState {
360 /// Shard is being provisioned: id allocated and metrics collector
361 /// in place, but `select_shard` must not route to it yet because
362 /// upstream workers (drain / batch) have not been spawned. Caller
363 /// transitions to `Active` via `activate` once the workers are
364 /// ready. Closes the race where a freshly-added shard accepted
365 /// producer pushes before any consumer existed.
366 Provisioning,
367 /// Shard is active and accepting producers.
368 Active,
369 /// Shard is draining (no new producers, waiting for empty).
370 Draining,
371 /// Shard is stopped and can be removed.
372 Stopped,
373}
374
375/// Information about a shard managed by the mapper.
376#[derive(Debug)]
377struct MappedShard {
378 /// Shard ID.
379 id: u16,
380 /// Current state.
381 state: ShardState,
382 /// Metrics collector.
383 metrics: Arc<ShardMetricsCollector>,
384 /// When this shard entered drain mode (if draining).
385 drain_started: Option<Instant>,
386 /// Last collected metrics.
387 last_metrics: ShardMetrics,
388 /// When this shard last transitioned to `Active`. Used by
389 /// `evaluate_scaling` to skip recently-activated shards from
390 /// scale decisions: their `last_metrics` is the
391 /// `ShardMetrics::new(id)` placeholder until at least one
392 /// `collect_metrics` cycle has run, and the placeholder
393 /// (`fill_ratio = 0.0, event_rate = 0`) trips the
394 /// underutilized trigger immediately — oscillating the system
395 /// (scale-up → next tick scale-down → next tick scale-up …)
396 /// when a fresh shard is added but hasn't yet absorbed any
397 /// traffic.
398 activated_at: Instant,
399}
400
401/// Callback type for shard lifecycle events.
402type ShardCallback = Box<dyn Fn(u16) + Send + Sync>;
403
404/// Dynamic shard mapper that manages shard allocation and producer routing.
405///
406/// This is the core component for dynamic scaling. It:
407/// - Tracks metrics for all shards
408/// - Makes scaling decisions based on policy
409/// - Routes producers to the least-loaded shards
410/// - Manages shard lifecycle (active → draining → stopped)
411pub struct ShardMapper {
412 /// Mapped shards (RwLock for concurrent reads, rare writes).
413 shards: RwLock<Vec<MappedShard>>,
414 /// Current active shard count.
415 active_count: AtomicU16,
416 /// Scaling policy.
417 ///
418 /// This field is immutable for the lifetime of the mapper. The
419 /// previous `set_policy(&mut self, …)` API was unreachable in
420 /// practice — every production callsite holds the mapper behind
421 /// an `Arc`, and `Arc::get_mut` requires a strong count of 1,
422 /// which never holds once the worker pool clones the `Arc`. The
423 /// method has been removed; recreate the mapper (and
424 /// the bus that owns it) to change the policy.
425 policy: ScalingPolicy,
426 /// Ring buffer capacity for new shards.
427 ring_buffer_capacity: usize,
428 /// Last scaling operation timestamp.
429 ///
430 /// This RwLock is **logically** scoped to the
431 /// outer `shards.write()` lock — `scale_up`, `scale_down`,
432 /// and `scale_up_provisioning` all read this field and write
433 /// to it while holding `shards.write()`. The cooldown gate
434 /// is therefore atomic with the scale mutation: no caller
435 /// can pass the cooldown check, observe stale `last_scaling`,
436 /// and have its mutation interleave with another caller.
437 ///
438 /// **If you narrow `shards.write()`'s scope in any of those
439 /// callers, this implicit serialization breaks.** Either:
440 /// - Keep the cooldown read+write inside the outer lock, OR
441 /// - Use a `compare_exchange`-style update on a single
442 /// `AtomicI64` of nanos so the gate is atomic on its own.
443 ///
444 /// The doc-comment is here so a future refactorer doesn't
445 /// silently break the contract.
446 last_scaling: RwLock<Option<Instant>>,
447 /// Callback for shard creation (provided by ShardManager).
448 on_shard_created: RwLock<Option<ShardCallback>>,
449 /// Callback for shard removal (provided by ShardManager).
450 on_shard_removed: RwLock<Option<ShardCallback>>,
451 /// Monotonic shard-id allocator. The next `scale_up` call gets
452 /// `fetch_add(1)` from here. Distinct from `shards.iter().max() +
453 /// 1`: that approach reused ids whenever the highest-numbered
454 /// shard had been drained-and-removed, silently merging two
455 /// unrelated shard lifetimes in any external system that keys
456 /// metrics or checkpoints on shard id. Monotonic allocation
457 /// ensures every shard ever allocated has a globally unique id
458 /// for the lifetime of this mapper.
459 next_shard_id: AtomicU16,
460}
461
462impl ShardMapper {
463 /// Create a new shard mapper with the given initial shard count and policy.
464 pub fn new(
465 initial_shards: u16,
466 ring_buffer_capacity: usize,
467 policy: ScalingPolicy,
468 ) -> Result<Self, ScalingError> {
469 let mut policy = policy.normalize();
470 // Ensure max_shards can accommodate the initial shard count
471 if policy.max_shards < initial_shards {
472 policy.max_shards = initial_shards;
473 }
474 policy
475 .validate()
476 .map_err(|e| ScalingError::InvalidPolicy(e.to_string()))?;
477
478 // Initial shards: stamp `activated_at` far enough in the
479 // past that they're not subject to the warmup skip in
480 // `evaluate_scaling`. The boot-time shards have whatever
481 // baseline traffic the system serves; they shouldn't be
482 // exempted from scale decisions just because the mapper
483 // was just constructed.
484 let boot = Instant::now()
485 .checked_sub(std::time::Duration::from_secs(3600))
486 .unwrap_or_else(Instant::now);
487 let shards: Vec<MappedShard> = (0..initial_shards)
488 .map(|id| MappedShard {
489 id,
490 state: ShardState::Active,
491 metrics: Arc::new(ShardMetricsCollector::new(id, ring_buffer_capacity)),
492 drain_started: None,
493 last_metrics: ShardMetrics::new(id),
494 activated_at: boot,
495 })
496 .collect();
497
498 Ok(Self {
499 shards: RwLock::new(shards),
500 active_count: AtomicU16::new(initial_shards),
501 policy,
502 ring_buffer_capacity,
503 last_scaling: RwLock::new(None),
504 on_shard_created: RwLock::new(None),
505 on_shard_removed: RwLock::new(None),
506 // Initial shards occupy ids `[0, initial_shards)`, so the
507 // first scale-up takes `initial_shards`.
508 next_shard_id: AtomicU16::new(initial_shards),
509 })
510 }
511
512 /// Set callback for shard creation.
513 pub fn set_on_shard_created<F>(&self, callback: F)
514 where
515 F: Fn(u16) + Send + Sync + 'static,
516 {
517 *self.on_shard_created.write() = Some(Box::new(callback));
518 }
519
520 /// Set callback for shard removal.
521 pub fn set_on_shard_removed<F>(&self, callback: F)
522 where
523 F: Fn(u16) + Send + Sync + 'static,
524 {
525 *self.on_shard_removed.write() = Some(Box::new(callback));
526 }
527
528 /// Get the metrics collector for a shard.
529 pub fn metrics_collector(&self, shard_id: u16) -> Option<Arc<ShardMetricsCollector>> {
530 let shards = self.shards.read();
531 shards
532 .iter()
533 .find(|s| s.id == shard_id)
534 .map(|s| s.metrics.clone())
535 }
536
537 /// Get current active shard count.
538 pub fn active_shard_count(&self) -> u16 {
539 self.active_count.load(AtomicOrdering::Acquire)
540 }
541
542 /// Get total shard count (including draining).
543 pub fn total_shard_count(&self) -> u16 {
544 self.shards.read().len() as u16
545 }
546
547 /// Select the best shard for a new event/producer.
548 ///
549 /// This implements weighted shard selection:
550 /// - Only considers active (non-draining) shards
551 /// - Prefers shards with lower weight (less loaded)
552 /// - Falls back to round-robin if weights are equal
553 #[inline]
554 pub fn select_shard(&self, event_hash: u64) -> u16 {
555 let shards = self.shards.read();
556
557 // Filter to active shards only
558 let active: Vec<_> = shards
559 .iter()
560 .filter(|s| s.state == ShardState::Active)
561 .collect();
562
563 if active.is_empty() {
564 // Previously fell back to `find(|s| s.state !=
565 // ShardState::Stopped)`, which silently routed to a
566 // `Draining` shard. Pushes to a draining shard increment
567 // `pushes_since_drain_start`, blocking finalization
568 // indefinitely.
569 //
570 // `Provisioning` shards aren't routable either — they
571 // exist in the mapper but the routing table doesn't have
572 // them yet, so any caller that tries to push lands
573 // in `resolve_idx → None` and surfaces as "unrouted" via
574 // the manager-level counter. That's the correct signal:
575 // the system has no destination for this event, the
576 // caller should back off and retry.
577 //
578 // Return `u16::MAX` (out-of-band sentinel) so callers
579 // that look up the id in the routing table get a
580 // definite miss, which the manager already accounts for.
581 return u16::MAX;
582 }
583
584 // Find the shard with lowest weight
585 let min_weight = active
586 .iter()
587 .map(|s| s.last_metrics.weight)
588 .fold(f64::MAX, f64::min);
589
590 // Get all shards with the minimum weight (within tolerance)
591 let candidates: Vec<_> = active
592 .iter()
593 .filter(|s| (s.last_metrics.weight - min_weight).abs() < 0.1)
594 .collect();
595
596 // Use hash to pick among candidates for determinism
597 // Fallback to first active shard if tolerance filter excludes all (e.g. NaN weights)
598 if candidates.is_empty() {
599 return active[0].id;
600 }
601 // Pre-fix used `(event_hash as usize) % candidates.len()`,
602 // which biases low-bucket indices when `candidates.len()`
603 // is not a power of two. With u64 hashes the bias is small
604 // but non-zero and accumulates over time as a sustained
605 // skew toward shards at the low end of the candidate
606 // vector. Lemire's technique
607 // (https://lemire.me/blog/2016/06/30/fast-random-shuffling/)
608 // computes the index as `(hash * len) >> 64` — an unbiased
609 // integer mapping for any `len` that fits in u64.
610 let idx = ((event_hash as u128 * candidates.len() as u128) >> 64) as usize;
611 candidates[idx].id
612 }
613
614 /// Collect metrics from all shards and update weights.
615 pub fn collect_metrics(&self) -> Vec<ShardMetrics> {
616 let mut shards = self.shards.write();
617 shards
618 .iter_mut()
619 .map(|s| {
620 s.last_metrics = s.metrics.collect_and_reset();
621 s.last_metrics.clone()
622 })
623 .collect()
624 }
625
626 /// Evaluate scaling based on current metrics.
627 ///
628 /// Returns a scaling decision without executing it.
629 pub fn evaluate_scaling(&self) -> ScalingDecision {
630 if !self.policy.auto_scale {
631 return ScalingDecision::None;
632 }
633
634 // Check cooldown
635 if let Some(last) = *self.last_scaling.read() {
636 if last.elapsed() < self.policy.cooldown {
637 return ScalingDecision::None;
638 }
639 }
640
641 let shards = self.shards.read();
642 let active_count = self.active_count.load(AtomicOrdering::Acquire);
643
644 // Check for scale-up triggers
645 let mut overloaded_count = 0;
646 let mut underutilized_count = 0;
647
648 // Warmup window for freshly-activated shards. A just-
649 // activated shard's `last_metrics` is the
650 // `ShardMetrics::new(id)` placeholder
651 // (`fill_ratio = 0.0, event_rate = 0`) until at least one
652 // `collect_metrics` cycle has run. The placeholder
653 // immediately matches the underutilized trigger
654 // (`fill_ratio < underutilized_threshold && event_rate
655 // == 0`), so a fresh shard added by scale-up would
656 // immediately count as underutilized on the next
657 // `evaluate_scaling` and trigger scale-down — oscillating
658 // the system. Reuse `policy.cooldown` as the warmup
659 // window: it's already the minimum gap between scaling
660 // actions, and a shard collected at least once within
661 // that window has accumulated real metrics.
662 let now = Instant::now();
663 let warmup = self.policy.cooldown;
664
665 for shard in shards.iter() {
666 if shard.state != ShardState::Active {
667 continue;
668 }
669
670 // Skip the placeholder-metrics window for freshly-
671 // activated shards. They count toward `active_count`
672 // (so the budget math stays consistent) but don't
673 // tip the overload/underutilized tallies.
674 if now.duration_since(shard.activated_at) < warmup {
675 continue;
676 }
677
678 let m = &shard.last_metrics;
679
680 // Scale-up triggers
681 if m.fill_ratio > self.policy.fill_ratio_threshold
682 || m.avg_push_latency_ns > self.policy.push_latency_threshold_ns
683 || m.avg_flush_latency_us > self.policy.flush_latency_threshold_us
684 {
685 overloaded_count += 1;
686 }
687
688 // Scale-down triggers
689 if m.fill_ratio < self.policy.underutilized_threshold && m.event_rate == 0 {
690 underutilized_count += 1;
691 }
692 }
693
694 // Scale up if more than half of shards are overloaded
695 if overloaded_count > active_count / 2 && active_count < self.policy.max_shards {
696 // Add shards proportional to overload
697 let shards_to_add = (overloaded_count / 2)
698 .max(1)
699 .min(self.policy.max_shards - active_count);
700 return ScalingDecision::ScaleUp(shards_to_add);
701 }
702
703 // Scale down if more than half of shards are underutilized
704 // and we're above minimum
705 if underutilized_count > active_count / 2 && active_count > self.policy.min_shards {
706 let shards_to_remove = (underutilized_count / 2)
707 .max(1)
708 .min(active_count - self.policy.min_shards);
709 return ScalingDecision::ScaleDown(shards_to_remove);
710 }
711
712 ScalingDecision::None
713 }
714
715 /// Validate cooldown + max-shards budget for an `add count`
716 /// scale-up request. Cheap pre-check that doesn't take the
717 /// write lock — callers re-check under the lock.
718 fn check_scale_up_budget(&self, count: u16) -> Result<(), ScalingError> {
719 let current = self.active_count.load(AtomicOrdering::Acquire);
720 let would_be = current
721 .checked_add(count)
722 .ok_or(ScalingError::AtMaxShards)?;
723 if would_be > self.policy.max_shards {
724 return Err(ScalingError::AtMaxShards);
725 }
726 let last = self.last_scaling.read();
727 if let Some(ts) = *last {
728 if ts.elapsed() < self.policy.cooldown {
729 return Err(ScalingError::InCooldown);
730 }
731 }
732 Ok(())
733 }
734
735 /// Allocate `count` shard ids and push their `MappedShard`
736 /// records into `shards` with the supplied `state`. The caller
737 /// already holds `self.shards.write()` and is responsible for
738 /// dropping it before notifying callbacks. Returns the allocated
739 /// ids in order.
740 ///
741 /// Performs the budget + cooldown re-check under the write lock,
742 /// the next_shard_id allocation, and the per-shard push. Does NOT
743 /// touch `active_count` — `scale_up` bumps it for `Active` shards;
744 /// `Provisioning` shards bump it later when `activate` fires.
745 fn allocate_shards_inner(
746 &self,
747 count: u16,
748 state: ShardState,
749 shards: &mut Vec<MappedShard>,
750 ) -> Result<Vec<u16>, ScalingError> {
751 self.allocate_shards_inner_with_policy(count, state, shards, false)
752 }
753
754 fn allocate_shards_inner_with_policy(
755 &self,
756 count: u16,
757 state: ShardState,
758 shards: &mut Vec<MappedShard>,
759 force: bool,
760 ) -> Result<Vec<u16>, ScalingError> {
761 // Re-check budget under the write lock — two concurrent
762 // scale-up callers could both pass the read-locked early
763 // check, both serialize through `shards.write()`, and both
764 // succeed without this re-check.
765 if force {
766 // Budget only — skip cooldown for operator-initiated paths.
767 let current = self.active_count.load(AtomicOrdering::Acquire);
768 let would_be = current
769 .checked_add(count)
770 .ok_or(ScalingError::AtMaxShards)?;
771 if would_be > self.policy.max_shards {
772 return Err(ScalingError::AtMaxShards);
773 }
774 } else {
775 self.check_scale_up_budget(count)?;
776 }
777
778 let first_id = self.next_shard_id.load(AtomicOrdering::Relaxed);
779 let last_needed = first_id
780 .checked_add(count.saturating_sub(1))
781 .ok_or(ScalingError::AtMaxShards)?;
782 // Reserve `u16::MAX` as a sentinel so the post-allocation
783 // store cannot wrap.
784 if last_needed == u16::MAX {
785 return Err(ScalingError::AtMaxShards);
786 }
787 // `first_id + count == last_needed + 1`. We already
788 // refused `last_needed == u16::MAX` above, so the sum is
789 // provably <= u16::MAX. Use `checked_add` anyway as a
790 // belt-and-suspenders guard: a future change that
791 // weakens the sentinel check would otherwise reach an
792 // unchecked u16 wrap here, silently rolling
793 // `next_shard_id` back to 0 and then re-issuing already-
794 // allocated ids.
795 let next_id_after = first_id
796 .checked_add(count)
797 .ok_or(ScalingError::AtMaxShards)?;
798 self.next_shard_id
799 .store(next_id_after, AtomicOrdering::Relaxed);
800
801 let mut new_ids = Vec::with_capacity(count as usize);
802 let now = Instant::now();
803 for i in 0..count {
804 let new_id = first_id + i;
805 shards.push(MappedShard {
806 id: new_id,
807 state,
808 metrics: Arc::new(ShardMetricsCollector::new(
809 new_id,
810 self.ring_buffer_capacity,
811 )),
812 drain_started: None,
813 last_metrics: ShardMetrics::new(new_id),
814 // Stamp the activation moment so `evaluate_scaling`
815 // can skip this shard until at least one collect
816 // cycle has run. Prevents the placeholder
817 // (`fill_ratio = 0, event_rate = 0`) from
818 // immediately tripping the underutilized trigger
819 // and oscillating the system.
820 activated_at: now,
821 });
822 new_ids.push(new_id);
823 }
824 Ok(new_ids)
825 }
826
827 /// Execute a scale-up operation.
828 ///
829 /// Creates new shards in the `Active` state and makes them
830 /// immediately available for routing. Use [`scale_up_provisioning`]
831 /// if upstream workers (drain / batch) need to be wired up before
832 /// the shard becomes selectable — otherwise producer pushes can
833 /// race ahead of consumer creation.
834 ///
835 /// [`scale_up_provisioning`]: Self::scale_up_provisioning
836 pub fn scale_up(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
837 // Short-circuit `count == 0` so a no-op call doesn't bump the
838 // cooldown timestamp or trip the `u16::MAX` sentinel check
839 // (which previously fired spuriously when
840 // `first_id == u16::MAX` even though zero ids were being
841 // allocated).
842 if count == 0 {
843 return Ok(Vec::new());
844 }
845
846 self.check_scale_up_budget(count)?;
847
848 let mut shards = self.shards.write();
849 let new_ids = self.allocate_shards_inner(count, ShardState::Active, &mut shards)?;
850
851 // Update counts. `fetch_add` cannot wrap here — the
852 // `check_scale_up_budget` gate above ensures `current + count <=
853 // max_shards <= u16::MAX` — but keep the ordering explicit
854 // so the next contender's cooldown re-check sees the fresh
855 // timestamp.
856 self.active_count.fetch_add(count, AtomicOrdering::Release);
857 *self.last_scaling.write() = Some(Instant::now());
858
859 // Drop the write lock before notifying callbacks — they
860 // are user-supplied and may take arbitrary time.
861 drop(shards);
862
863 if let Some(callback) = self.on_shard_created.read().as_ref() {
864 for &id in &new_ids {
865 callback(id);
866 }
867 }
868
869 Ok(new_ids)
870 }
871
872 /// Like [`scale_up`], but the new shards are created in the
873 /// `Provisioning` state. They receive an id and a metrics
874 /// collector, but `select_shard` will not route to them and they
875 /// are excluded from `active_shard_count` / `evaluate_scaling`
876 /// until the caller transitions each shard with [`activate`].
877 ///
878 /// Use this when upstream consumer infrastructure (drain/batch
879 /// workers, mpsc channels, etc.) must be wired up *before* the
880 /// shard becomes selectable. Without this gating, producers can
881 /// observe the shard via `select_shard`, push into its ring
882 /// buffer, and never have those events drained.
883 ///
884 /// Returns the allocated ids in order. Cooldown / `max_shards`
885 /// gating matches `scale_up` so that staged allocation cannot be
886 /// used to bypass the policy.
887 ///
888 /// [`scale_up`]: Self::scale_up
889 /// [`activate`]: Self::activate
890 pub fn scale_up_provisioning(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
891 if count == 0 {
892 return Ok(Vec::new());
893 }
894
895 self.check_scale_up_budget(count)?;
896
897 let mut shards = self.shards.write();
898 let new_ids = self.allocate_shards_inner(count, ShardState::Provisioning, &mut shards)?;
899
900 // Bump cooldown but NOT active_count — these shards are
901 // not active yet. `activate` bumps active_count when each
902 // becomes selectable.
903 *self.last_scaling.write() = Some(Instant::now());
904 drop(shards);
905 // Intentionally do NOT fire `on_shard_created` here — the
906 // callback signals "shard is live"; provisioning shards are
907 // not. `activate` fires it instead.
908 Ok(new_ids)
909 }
910
911 /// Allocate `count` Provisioning shards, bypassing the cooldown gate.
912 ///
913 /// Used by operator-initiated `manual_scale_up` paths. The
914 /// cooldown exists to prevent the auto-scaling monitor from
915 /// scaling-up too aggressively in response to transient
916 /// load spikes; a manual call from an operator is a
917 /// deliberate request that should not be rate-limited by
918 /// the auto-scaling cadence. The budget check (against
919 /// `max_shards`) still applies.
920 ///
921 /// Pre-fix `manual_scale_up(N)` looped `add_shard()` N
922 /// times, each call invoking `scale_up_provisioning(1)`
923 /// which bumped `last_scaling`. The second call then
924 /// immediately failed with `InCooldown` (default 30s
925 /// cooldown), leaving the first shard half-added and
926 /// returning an error to the operator with no rollback.
927 pub fn scale_up_provisioning_force(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
928 if count == 0 {
929 return Ok(Vec::new());
930 }
931
932 let mut shards = self.shards.write();
933 let new_ids = self.allocate_shards_inner_with_policy(
934 count,
935 ShardState::Provisioning,
936 &mut shards,
937 true, // skip cooldown
938 )?;
939
940 // Still bump the cooldown timestamp so the next
941 // *auto-scaling* tick respects the cooldown floor.
942 *self.last_scaling.write() = Some(Instant::now());
943 drop(shards);
944 Ok(new_ids)
945 }
946
947 /// Transition a `Provisioning` shard to `Active`.
948 ///
949 /// Returns `Ok(true)` if a state transition actually occurred
950 /// (Provisioning → Active) and `Ok(false)` if the shard was
951 /// already `Active` — the latter is the idempotent path.
952 /// Returns `InvalidPolicy` for unknown or `Draining`/`Stopped`
953 /// shards — those states require a different lifecycle path.
954 /// Bumps `active_count` and notifies the `on_shard_created`
955 /// callback exactly once per real transition.
956 ///
957 /// Pre-fix this returned `Result<(), ScalingError>`,
958 /// so callers (notably `ShardManager::activate_shard`) could
959 /// not tell whether they had bumped the live count or not and
960 /// double-incremented their own `num_shards` on every
961 /// idempotent call.
962 pub fn activate(&self, shard_id: u16) -> Result<bool, ScalingError> {
963 let mut shards = self.shards.write();
964 let shard = shards
965 .iter_mut()
966 .find(|s| s.id == shard_id)
967 .ok_or_else(|| {
968 ScalingError::InvalidPolicy(format!("activate: shard {} not found", shard_id))
969 })?;
970 match shard.state {
971 ShardState::Active => return Ok(false),
972 ShardState::Provisioning => {
973 // Gate activation on
974 // `active_count < max_shards`. The budget gate at
975 // `check_scale_up_budget` only counts ALREADY-active
976 // shards — multiple `scale_up_provisioning(1)` calls
977 // can each pass (they don't bump `active_count`),
978 // and an unconditional `fetch_add(1)` here would
979 // push past `max_shards`. Subsequent
980 // `evaluate_scaling`'s `max_shards - active_count`
981 // arithmetic would then underflow u16 (debug-build
982 // panic; release wraps to ~65530). Activate-time
983 // re-checks the budget with the lock held; the
984 // Provisioning shard stays in Provisioning state and
985 // the caller (e.g. `add_shard_internal`'s rollback)
986 // is responsible for tearing it down.
987 //
988 // The load + state mutation + `fetch_add` all happen
989 // while we hold the `shards.write()` guard so a
990 // concurrent `activate(distinct_id)` reads the
991 // already-bumped `active_count` and hits
992 // `AtMaxShards` instead of squeezing through the
993 // window between our state update and our
994 // `fetch_add`. Pre-fix the `fetch_add` ran after
995 // `drop(shards)` — two activates could each see a
996 // stale count below `max_shards` and both bump,
997 // transiently overshooting the budget.
998 let current = self.active_count.load(AtomicOrdering::Acquire);
999 if current >= self.policy.max_shards {
1000 return Err(ScalingError::AtMaxShards);
1001 }
1002 shard.state = ShardState::Active;
1003 // Re-stamp `activated_at` so `evaluate_scaling`
1004 // gives this freshly-activated shard a warmup
1005 // window before counting it in the
1006 // overloaded/underutilized tallies. The
1007 // Provisioning → Active transition is the moment
1008 // traffic starts flowing; the shard's
1009 // `last_metrics` is still the
1010 // `ShardMetrics::new(id)` placeholder until the
1011 // next `collect_metrics` cycle.
1012 shard.activated_at = Instant::now();
1013 // Publish the increment while still holding the
1014 // write lock. `Release` here pairs with the
1015 // `Acquire` load above so any later activator that
1016 // takes the same lock observes our bump.
1017 self.active_count.fetch_add(1, AtomicOrdering::Release);
1018 }
1019 ShardState::Draining | ShardState::Stopped => {
1020 return Err(ScalingError::InvalidPolicy(format!(
1021 "activate: shard {} is in state {:?}, cannot activate",
1022 shard_id, shard.state
1023 )));
1024 }
1025 }
1026 drop(shards);
1027
1028 if let Some(callback) = self.on_shard_created.read().as_ref() {
1029 callback(shard_id);
1030 }
1031 Ok(true)
1032 }
1033
1034 /// Drain a specific shard by id, transitioning it from `Active`
1035 /// to `Draining`.
1036 ///
1037 /// Companion to `ShardManager::drain_shard`. The previous
1038 /// implementation only flipped the metrics collector's `draining`
1039 /// atomic; this version atomically updates `MappedShard.state`
1040 /// (so `select_shard` stops routing to the shard) and decrements
1041 /// `active_count` (so `evaluate_scaling`'s budget math stays
1042 /// consistent with `scale_down`). Returns an error if the shard
1043 /// is not in `Active` state, or if doing so would push the active
1044 /// count below `min_shards`.
1045 pub fn drain_specific(&self, shard_id: u16) -> Result<(), ScalingError> {
1046 let mut shards = self.shards.write();
1047 let current_active = self.active_count.load(AtomicOrdering::Acquire);
1048 if current_active <= self.policy.min_shards {
1049 return Err(ScalingError::AtMinShards);
1050 }
1051 let shard = shards
1052 .iter_mut()
1053 .find(|s| s.id == shard_id)
1054 .ok_or_else(|| {
1055 ScalingError::InvalidPolicy(format!("drain_specific: shard {} not found", shard_id))
1056 })?;
1057 match shard.state {
1058 ShardState::Active => {
1059 shard.state = ShardState::Draining;
1060 shard.drain_started = Some(Instant::now());
1061 shard.metrics.set_draining(true);
1062 }
1063 ShardState::Draining => return Ok(()),
1064 ShardState::Provisioning | ShardState::Stopped => {
1065 return Err(ScalingError::InvalidPolicy(format!(
1066 "drain_specific: shard {} is in state {:?}, cannot drain",
1067 shard_id, shard.state
1068 )));
1069 }
1070 }
1071 drop(shards);
1072 self.active_count.fetch_sub(1, AtomicOrdering::Release);
1073 // Bump `last_scaling` so a subsequent `scale_up` is gated
1074 // by the cooldown floor. Pre-fix `drain_specific` removed
1075 // a shard from Active without touching `last_scaling`, so
1076 // the sequence `drain_specific(id) → scale_up(N)`
1077 // bypassed the cooldown — `scale_down` writes
1078 // `last_scaling` precisely for this reason. From the
1079 // budget-math perspective `drain_specific` IS a scale-
1080 // down (it decrements `active_count` and trips the
1081 // `min_shards` floor), so it should also gate
1082 // re-expansion the same way.
1083 *self.last_scaling.write() = Some(Instant::now());
1084 Ok(())
1085 }
1086
1087 /// Start draining shards for scale-down.
1088 ///
1089 /// Marks shards as draining so they stop receiving new events.
1090 /// Shards will be removed once they're empty.
1091 pub fn scale_down(&self, count: u16) -> Result<Vec<u16>, ScalingError> {
1092 // Early checks (may race, but avoid acquiring the write lock)
1093 let current = self.active_count.load(AtomicOrdering::Acquire);
1094 if current <= self.policy.min_shards {
1095 return Err(ScalingError::AtMinShards);
1096 }
1097
1098 let to_drain = count.min(current - self.policy.min_shards);
1099 if to_drain == 0 {
1100 return Err(ScalingError::AtMinShards);
1101 }
1102 {
1103 let last = self.last_scaling.read();
1104 if let Some(ts) = *last {
1105 if ts.elapsed() < self.policy.cooldown {
1106 return Err(ScalingError::InCooldown);
1107 }
1108 }
1109 }
1110
1111 let mut shards = self.shards.write();
1112
1113 // Re-check under the lock to prevent race conditions (double-check pattern)
1114 let current = self.active_count.load(AtomicOrdering::Acquire);
1115 if current <= self.policy.min_shards {
1116 return Err(ScalingError::AtMinShards);
1117 }
1118 let to_drain = count.min(current - self.policy.min_shards);
1119 if to_drain == 0 {
1120 return Err(ScalingError::AtMinShards);
1121 }
1122 // Re-check cooldown under the same write lock that gates
1123 // mutation — see the matching note in `scale_up`.
1124 {
1125 let last = self.last_scaling.read();
1126 if let Some(ts) = *last {
1127 if ts.elapsed() < self.policy.cooldown {
1128 return Err(ScalingError::InCooldown);
1129 }
1130 }
1131 }
1132
1133 let mut drained_ids = Vec::with_capacity(to_drain as usize);
1134
1135 // Find shards with lowest weight (least utilized) to drain
1136 let mut active_indices: Vec<_> = shards
1137 .iter()
1138 .enumerate()
1139 .filter(|(_, s)| s.state == ShardState::Active)
1140 .map(|(i, s)| (i, s.last_metrics.weight))
1141 .collect();
1142
1143 // Sort by weight (ascending - drain least utilized first)
1144 active_indices.sort_by(|a, b| a.1.total_cmp(&b.1));
1145
1146 // Mark shards for draining
1147 for (idx, _) in active_indices.into_iter().take(to_drain as usize) {
1148 shards[idx].state = ShardState::Draining;
1149 shards[idx].drain_started = Some(Instant::now());
1150 shards[idx].metrics.set_draining(true);
1151 drained_ids.push(shards[idx].id);
1152 }
1153
1154 // Update count
1155 self.active_count
1156 .fetch_sub(to_drain, AtomicOrdering::Release);
1157 *self.last_scaling.write() = Some(Instant::now());
1158
1159 Ok(drained_ids)
1160 }
1161
1162 /// Check draining shards and finalize those that are empty.
1163 ///
1164 /// Returns IDs of shards that were stopped.
1165 ///
1166 /// This predicate looks ONLY at the ring buffer
1167 /// (`current_len` + `pushes_since_drain_start`); it does NOT
1168 /// probe the per-shard mpsc channel or the BatchWorker's
1169 /// `current_batch`. A shard that the predicate flags as empty
1170 /// can still have events queued in those two places. The
1171 /// correctness gate is therefore `bus::remove_shard_internal`,
1172 /// which awaits the BatchWorker's `JoinHandle` before
1173 /// constructing the stranded-flush batch — see that function's
1174 /// step 3 for the rationale. Tightening this predicate is a
1175 /// defense-in-depth follow-up; a stricter ring-buffer-empty
1176 /// signal here would only narrow an already-closed window.
1177 pub fn finalize_draining(&self) -> Vec<u16> {
1178 let mut shards = self.shards.write();
1179 let mut stopped = Vec::new();
1180
1181 for shard in shards.iter_mut() {
1182 if shard.state == ShardState::Draining {
1183 // Check if shard is empty by reading current_len directly,
1184 // avoiding collect_and_reset() which destructively zeros all counters.
1185 let current_len = shard.metrics.current_len.load(AtomicOrdering::Relaxed);
1186 let fill_ratio = if shard.metrics.capacity > 0 {
1187 current_len as f64 / shard.metrics.capacity as f64
1188 } else {
1189 0.0
1190 };
1191 // Previously read `events_in_window` here, which
1192 // `collect_and_reset` zeros every metrics tick. A
1193 // producer push that landed in the window between two
1194 // ticks could be silently zeroed out, so a draining
1195 // shard whose buffer transiently emptied was finalized
1196 // with a producer still attached.
1197 // `pushes_since_drain_start` is a separate counter
1198 // that is only reset by `set_draining(true)`, so any
1199 // push observed since the drain began is sticky —
1200 // exactly the signal we want.
1201 // Acquire pairs with `set_draining`'s SeqCst reset so
1202 // the load can't observe a stale value from before the
1203 // drain began. A Relaxed load here let weakly-ordered
1204 // hardware see the pre-reset count and finalize while
1205 // a producer was still pushing.
1206 let pushes_after_drain = shard.metrics.pushes_since_drain_start();
1207 if fill_ratio == 0.0 && pushes_after_drain == 0 {
1208 // Check if we've waited long enough
1209 if let Some(drain_start) = shard.drain_started {
1210 if drain_start.elapsed() > Duration::from_millis(100) {
1211 shard.state = ShardState::Stopped;
1212 stopped.push(shard.id);
1213 }
1214 }
1215 }
1216 }
1217 }
1218
1219 // Drop the write lock BEFORE notifying. The callback is
1220 // user-supplied and may re-enter the mapper (`shard_state`,
1221 // `select_shard`, `metrics_collector`, …), each of which
1222 // acquires `shards.read()`. `parking_lot::RwLock` is not
1223 // recursive, so a re-entrant read attempt while we hold a
1224 // write would deadlock. `scale_up`'s callback path already
1225 // releases its lock before calling out — mirror that
1226 // here.
1227 drop(shards);
1228
1229 if !stopped.is_empty() {
1230 if let Some(callback) = self.on_shard_removed.read().as_ref() {
1231 for &id in &stopped {
1232 callback(id);
1233 }
1234 }
1235 }
1236
1237 stopped
1238 }
1239
1240 /// Remove a specific shard from the mapper if it is in the
1241 /// `Stopped` state. Used by `ShardManager::remove_shard` so a
1242 /// per-shard cleanup doesn't disturb sibling `Stopped`
1243 /// entries — which a sequential `manual_scale_down` loop
1244 /// still needs to look up state for. Returns `true` if the
1245 /// shard existed and was Stopped (and was removed).
1246 pub fn remove_specific_stopped_shard(&self, shard_id: u16) -> bool {
1247 let mut shards = self.shards.write();
1248 let before = shards.len();
1249 shards.retain(|s| !(s.id == shard_id && s.state == ShardState::Stopped));
1250 shards.len() < before
1251 }
1252
1253 /// Remove stopped shards from the mapper.
1254 pub fn remove_stopped_shards(&self) -> Vec<u16> {
1255 let mut shards = self.shards.write();
1256 let before = shards.len();
1257 let removed: Vec<u16> = shards
1258 .iter()
1259 .filter(|s| s.state == ShardState::Stopped)
1260 .map(|s| s.id)
1261 .collect();
1262
1263 shards.retain(|s| s.state != ShardState::Stopped);
1264
1265 if shards.len() < before {
1266 tracing::info!(
1267 removed = removed.len(),
1268 remaining = shards.len(),
1269 "Removed stopped shards"
1270 );
1271 }
1272
1273 removed
1274 }
1275
1276 /// Get the state of a specific shard.
1277 pub fn shard_state(&self, shard_id: u16) -> Option<ShardState> {
1278 self.shards
1279 .read()
1280 .iter()
1281 .find(|s| s.id == shard_id)
1282 .map(|s| s.state)
1283 }
1284
1285 /// Get all active shard IDs.
1286 pub fn active_shard_ids(&self) -> Vec<u16> {
1287 self.shards
1288 .read()
1289 .iter()
1290 .filter(|s| s.state == ShardState::Active)
1291 .map(|s| s.id)
1292 .collect()
1293 }
1294
1295 /// Get all shard IDs (including draining).
1296 pub fn all_shard_ids(&self) -> Vec<u16> {
1297 self.shards
1298 .read()
1299 .iter()
1300 .filter(|s| s.state != ShardState::Stopped)
1301 .map(|s| s.id)
1302 .collect()
1303 }
1304
1305 /// Get the scaling policy.
1306 pub fn policy(&self) -> &ScalingPolicy {
1307 &self.policy
1308 }
1309 // `set_policy` previously took `&mut self` and was unreachable
1310 // through the `Arc<ShardMapper>` that the production code holds
1311 // (`Arc::get_mut` fails once the worker pool has cloned the Arc).
1312 // The method has been removed — recreate the mapper / bus to
1313 // change the policy.
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318 use super::*;
1319
1320 #[test]
1321 fn test_shard_mapper_creation() {
1322 let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1323 assert_eq!(mapper.active_shard_count(), 4);
1324 assert_eq!(mapper.total_shard_count(), 4);
1325 }
1326
1327 #[test]
1328 fn test_select_shard_distributes() {
1329 let mapper = ShardMapper::new(4, 1024, ScalingPolicy::default()).unwrap();
1330
1331 // Different hashes should potentially select different shards
1332 let mut selected = std::collections::HashSet::new();
1333 for i in 0..100u64 {
1334 let shard = mapper.select_shard(i * 12345);
1335 selected.insert(shard);
1336 }
1337
1338 // With 4 shards, we should hit multiple
1339 assert!(!selected.is_empty());
1340 }
1341
1342 /// `select_shard`'s candidate-index computation must
1343 /// be unbiased across the u64 hash space. Pre-fix used
1344 /// `hash as usize % candidates.len()`, which over-weights low
1345 /// indices when `candidates.len()` is not a power of two.
1346 /// With u64 hashes the bias is small but non-zero and
1347 /// sustains a hot-shard skew over time. Lemire's
1348 /// `(hash * len) >> 64` is unbiased.
1349 ///
1350 /// We test the unbiased property by sampling a uniform
1351 /// distribution of u64 hashes over 3 candidate shards and
1352 /// asserting each bucket gets close to 1/3 of the picks.
1353 /// Empirical bound: ±5% across 30 000 trials with a
1354 /// well-distributed input.
1355 #[test]
1356 fn select_shard_distribution_is_unbiased() {
1357 // 3 candidates: a non-power-of-2 to expose the modulo
1358 // bias the fix removes.
1359 let mapper = ShardMapper::new(3, 1024, ScalingPolicy::default()).unwrap();
1360
1361 let trials = 30_000u64;
1362 // Spread inputs uniformly across the u64 range so the
1363 // multiply-shift mapping behaves as designed.
1364 let stride = u64::MAX / trials;
1365 let mut counts = [0u64; 3];
1366 for i in 0..trials {
1367 let h = i.wrapping_mul(stride);
1368 let id = mapper.select_shard(h);
1369 counts[id as usize] += 1;
1370 }
1371
1372 let expected = (trials / 3) as i64;
1373 for (id, &count) in counts.iter().enumerate() {
1374 let diff = (count as i64 - expected).abs();
1375 let pct = (diff as f64 / expected as f64) * 100.0;
1376 assert!(
1377 pct < 5.0,
1378 "shard {} bucket has {} hits ({:.2}% off expected {}); \
1379 modulo bias would drift higher on certain shards",
1380 id,
1381 count,
1382 pct,
1383 expected
1384 );
1385 }
1386 }
1387
1388 #[test]
1389 fn test_scale_up() {
1390 // Explicitly set max_shards to allow scaling from 2 to 4
1391 let policy = ScalingPolicy {
1392 max_shards: 8,
1393 ..Default::default()
1394 };
1395 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1396
1397 let new_ids = mapper.scale_up(2).unwrap();
1398 assert_eq!(new_ids.len(), 2);
1399 assert_eq!(mapper.active_shard_count(), 4);
1400 }
1401
1402 #[test]
1403 fn test_scale_up_max_limit() {
1404 let policy = ScalingPolicy {
1405 max_shards: 4,
1406 ..Default::default()
1407 };
1408 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1409
1410 let result = mapper.scale_up(1);
1411 assert!(matches!(result, Err(ScalingError::AtMaxShards)));
1412 }
1413
1414 #[test]
1415 fn test_scale_down() {
1416 let policy = ScalingPolicy {
1417 min_shards: 1,
1418 cooldown: Duration::from_nanos(1), // Disable cooldown for test
1419 ..Default::default()
1420 };
1421 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1422
1423 let drained = mapper.scale_down(2).unwrap();
1424 assert_eq!(drained.len(), 2);
1425 assert_eq!(mapper.active_shard_count(), 2);
1426 }
1427
1428 #[test]
1429 fn test_scale_down_min_limit() {
1430 let policy = ScalingPolicy {
1431 min_shards: 4,
1432 ..Default::default()
1433 };
1434 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
1435
1436 let result = mapper.scale_down(1);
1437 assert!(matches!(result, Err(ScalingError::AtMinShards)));
1438 }
1439
1440 #[test]
1441 fn test_metrics_collection() {
1442 let mapper = ShardMapper::new(2, 1024, ScalingPolicy::default()).unwrap();
1443
1444 // Record some metrics
1445 if let Some(collector) = mapper.metrics_collector(0) {
1446 collector.record_buffer_len(512);
1447 collector.record_push(5);
1448 collector.record_push(10);
1449 }
1450
1451 let metrics = mapper.collect_metrics();
1452 assert_eq!(metrics.len(), 2);
1453
1454 let shard0_metrics = metrics.iter().find(|m| m.shard_id == 0).unwrap();
1455 assert!(shard0_metrics.fill_ratio > 0.0);
1456 }
1457
1458 /// Regression: a `record_push` / `record_flush` interleaving
1459 /// with a `collect_and_reset` swap must NOT desync `(sum,
1460 /// count)`. Pre-fix `push_latency_sum_ns` and `push_count`
1461 /// were independent atomics; a tick between the two
1462 /// `fetch_add`s captured the sum without the matching count
1463 /// (or the count without the sum). The resulting `avg =
1464 /// sum.checked_div(count).unwrap_or(0)` returned 0 in window
1465 /// N (sum without count) AND 0 in window N+1 (count without
1466 /// sum) — silently zeroing the average that drives
1467 /// `evaluate_scaling`'s push-latency scale-up trigger.
1468 ///
1469 /// Post-fix `(sum, count)` is packed into one
1470 /// `AtomicU64` so the swap captures both atomically. This
1471 /// test fires N concurrent `record_push` calls and a single
1472 /// `collect_and_reset` and asserts the captured count
1473 /// matches the captured sum (i.e. `sum >= count` because
1474 /// every push contributes at least 1 ns; `sum / count` is
1475 /// well-defined for any non-zero count).
1476 #[test]
1477 fn record_push_collect_no_sum_count_desync() {
1478 use std::sync::Barrier;
1479 use std::thread;
1480
1481 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1482 const PUSHERS: usize = 4;
1483 const PUSHES_PER_THREAD: usize = 1_000;
1484
1485 let barrier = Arc::new(Barrier::new(PUSHERS + 1));
1486 let mut handles = vec![];
1487 for _ in 0..PUSHERS {
1488 let c = collector.clone();
1489 let b = barrier.clone();
1490 handles.push(thread::spawn(move || {
1491 b.wait();
1492 for i in 0..PUSHES_PER_THREAD {
1493 // Vary latencies so sum/count averages
1494 // aren't trivially the same number.
1495 c.record_push((i as u64 % 100) + 1);
1496 }
1497 }));
1498 }
1499
1500 // Race a collect_and_reset across the pushers.
1501 barrier.wait();
1502 let snapshot1 = collector.collect_and_reset();
1503 for h in handles {
1504 h.join().unwrap();
1505 }
1506 // After all threads finish, drain whatever remains.
1507 let snapshot2 = collector.collect_and_reset();
1508
1509 // Reconstruct count and sum from the two snapshots.
1510 // We can't easily expose the packed atomic, so we use
1511 // the per-window averages and event counts as a
1512 // consistency check.
1513 let total_events = snapshot1.event_rate + snapshot2.event_rate;
1514 assert_eq!(
1515 total_events as usize,
1516 PUSHERS * PUSHES_PER_THREAD,
1517 "all pushes must be accounted for"
1518 );
1519
1520 // For each window: if event_rate > 0, avg_push_latency
1521 // must be > 0 (non-zero average) — pre-fix the desync
1522 // could land event_rate > 0 with avg = 0 (count
1523 // captured without sum, or sum without count → div-by-
1524 // zero clamped to 0). This is the directly visible
1525 // symptom of the desync.
1526 //
1527 // events_in_window is incremented separately from the
1528 // packed (sum, count) word, so a strict assertion of
1529 // "event_rate is exactly count" isn't safe — but the
1530 // weaker invariant "if any pushes were captured in the
1531 // (sum,count) word, the average is non-zero" survives
1532 // the packed-atomic fix.
1533 for snap in [&snapshot1, &snapshot2] {
1534 if snap.avg_push_latency_ns == 0 {
1535 // Either no pushes were captured in this
1536 // window, or — pre-fix — sum/count desynced.
1537 // The post-fix shape can only produce
1538 // avg=0 when count is also 0; we can't read
1539 // count directly from ShardMetrics, but
1540 // exercise the invariant by confirming the
1541 // OTHER window's sum is consistent with all
1542 // pushes.
1543 continue;
1544 }
1545 assert!(
1546 snap.avg_push_latency_ns >= 1,
1547 "regression: a window with non-zero avg must have \
1548 a positive sum (pre-fix sum-without-count desync \
1549 produced avg=0 with non-zero events)"
1550 );
1551 }
1552 }
1553
1554 #[test]
1555 fn test_draining_excludes_from_selection() {
1556 let policy = ScalingPolicy {
1557 min_shards: 1,
1558 cooldown: Duration::from_nanos(1),
1559 ..Default::default()
1560 };
1561 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1562
1563 // Drain one shard
1564 let drained = mapper.scale_down(1).unwrap();
1565 assert_eq!(drained.len(), 1);
1566
1567 // All selections should go to the remaining active shard
1568 let active_ids = mapper.active_shard_ids();
1569 assert_eq!(active_ids.len(), 1);
1570
1571 for i in 0..100u64 {
1572 let selected = mapper.select_shard(i);
1573 assert!(active_ids.contains(&selected));
1574 }
1575 }
1576
1577 #[test]
1578 fn test_policy_validation() {
1579 let invalid_policy = ScalingPolicy {
1580 fill_ratio_threshold: 1.5, // Invalid
1581 ..Default::default()
1582 };
1583 assert!(invalid_policy.validate().is_err());
1584
1585 // Without normalize(), this would be invalid
1586 let invalid_policy2 = ScalingPolicy {
1587 min_shards: 10,
1588 max_shards: 5,
1589 ..Default::default()
1590 };
1591 assert!(invalid_policy2.validate().is_err());
1592 }
1593
1594 #[test]
1595 fn test_policy_normalize_auto_adjusts_max_shards() {
1596 // When min_shards > max_shards, normalize() should adjust max_shards
1597 let policy = ScalingPolicy {
1598 min_shards: 8,
1599 max_shards: 2, // Less than min_shards
1600 ..Default::default()
1601 };
1602
1603 let normalized = policy.normalize();
1604 assert_eq!(
1605 normalized.max_shards, 8,
1606 "max_shards should be adjusted to min_shards"
1607 );
1608 assert!(
1609 normalized.validate().is_ok(),
1610 "normalized policy should be valid"
1611 );
1612 }
1613
1614 /// Regression: BUG_REPORT.md #7 — `scale_up` previously allocated
1615 /// new shard ids as `shards.iter().max() + 1`, which reused ids
1616 /// after the highest-numbered shard was drained-and-removed.
1617 /// Reusing ids merges two distinct shard lifetimes in any
1618 /// external metric/checkpoint system that keys on shard id.
1619 /// The fix uses a monotonic `next_shard_id` counter.
1620 #[test]
1621 fn scale_up_does_not_reuse_ids_after_remove() {
1622 let policy = ScalingPolicy {
1623 min_shards: 1,
1624 max_shards: 16,
1625 cooldown: Duration::from_nanos(1),
1626 ..Default::default()
1627 };
1628 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
1629
1630 // Initial ids are 0 and 1. Scale up to 4 — new ids must be
1631 // 2 and 3 (the next two slots from the monotonic counter).
1632 let new_ids = mapper.scale_up(2).unwrap();
1633 assert_eq!(new_ids, vec![2, 3]);
1634
1635 // Drain one shard. `scale_down` picks by lowest weight, so
1636 // we don't get to choose which id is drained. Whichever it
1637 // is, we then force it through Stopped + remove and check
1638 // that the removed id is *not* reissued by the next
1639 // scale_up.
1640 let drained = mapper.scale_down(1).unwrap();
1641 let drained_id = drained[0];
1642 for shard in mapper.shards.write().iter_mut() {
1643 if shard.id == drained_id {
1644 shard.state = ShardState::Stopped;
1645 }
1646 }
1647 let removed = mapper.remove_stopped_shards();
1648 assert_eq!(removed, vec![drained_id]);
1649
1650 // Scale up by 1. The broken `max(existing_ids) + 1` allocator
1651 // could revive the just-removed id whenever `drained_id` had
1652 // been the highest id present (e.g. id 3 with 0/1/2/3 → 0/1/2
1653 // → next would be 3 again). The fix uses the monotonic
1654 // counter, which sits at 4 after the earlier scale_up, so
1655 // the new id must be 4 regardless of which id was drained.
1656 let new_ids = mapper.scale_up(1).unwrap();
1657 assert_eq!(
1658 new_ids,
1659 vec![4],
1660 "shard id {drained_id} was just removed; reusing any \
1661 previously-issued id would merge two distinct shard \
1662 lifetimes in external systems"
1663 );
1664 }
1665
1666 #[test]
1667 fn test_policy_normalize_preserves_valid_config() {
1668 // When max_shards >= min_shards, normalize() should not change anything
1669 let policy = ScalingPolicy {
1670 min_shards: 4,
1671 max_shards: 16,
1672 ..Default::default()
1673 };
1674
1675 let normalized = policy.normalize();
1676 assert_eq!(normalized.min_shards, 4);
1677 assert_eq!(normalized.max_shards, 16);
1678 }
1679
1680 #[test]
1681 fn test_shard_mapper_normalizes_policy() {
1682 // ShardMapper should accept a policy where min_shards > default max_shards
1683 // because it calls normalize() internally
1684 let policy = ScalingPolicy {
1685 min_shards: 4,
1686 ..Default::default()
1687 };
1688
1689 // This should succeed even on machines with < 4 CPUs
1690 let result = ShardMapper::new(4, 1024, policy);
1691 assert!(
1692 result.is_ok(),
1693 "ShardMapper should normalize policy automatically"
1694 );
1695 }
1696
1697 #[test]
1698 fn test_shard_mapper_adjusts_max_shards_to_initial_count() {
1699 // ShardMapper should adjust max_shards to accommodate initial_shards
1700 // even if initial_shards > default max_shards (CPU count)
1701 let policy = ScalingPolicy::default();
1702
1703 // Create mapper with 8 initial shards - should work even on 2-core machines
1704 let result = ShardMapper::new(8, 1024, policy);
1705 assert!(
1706 result.is_ok(),
1707 "ShardMapper should adjust max_shards to initial_shards"
1708 );
1709
1710 let mapper = result.unwrap();
1711 assert_eq!(mapper.active_shard_count(), 8);
1712
1713 // Verify the policy was adjusted
1714 assert!(
1715 mapper.policy().max_shards >= 8,
1716 "max_shards should be at least initial_shards"
1717 );
1718 }
1719
1720 #[test]
1721 fn test_scale_up_max_shards_concurrent() {
1722 use std::sync::Arc;
1723 use std::thread;
1724
1725 let policy = ScalingPolicy {
1726 max_shards: 10,
1727 cooldown: Duration::from_nanos(1), // Disable cooldown for test
1728 ..Default::default()
1729 };
1730 let mapper = Arc::new(ShardMapper::new(5, 1024, policy).unwrap());
1731
1732 // Spawn multiple threads that all try to scale up
1733 let mut handles = vec![];
1734 for _ in 0..5 {
1735 let mapper_clone = mapper.clone();
1736 handles.push(thread::spawn(move || mapper_clone.scale_up(3)));
1737 }
1738
1739 // Collect results
1740 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
1741
1742 // Some should succeed, some should fail with AtMaxShards
1743 let successes: Vec<_> = results.iter().filter(|r| r.is_ok()).collect();
1744 let failures: Vec<_> = results
1745 .iter()
1746 .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1747 .collect();
1748
1749 // We started with 5, max is 10, each tries to add 3
1750 // At most we can add 5 more shards, so at most 1-2 can succeed
1751 assert!(
1752 !successes.is_empty() || !failures.is_empty(),
1753 "at least some operations should complete"
1754 );
1755
1756 // Final count should not exceed max_shards
1757 assert!(
1758 mapper.active_shard_count() <= 10,
1759 "should never exceed max_shards, got {}",
1760 mapper.active_shard_count()
1761 );
1762 }
1763
1764 /// Multiple `scale_up_provisioning(1)` calls must
1765 /// never push `active_count` past `max_shards` via subsequent
1766 /// `activate()` calls. Pre-fix the budget gate
1767 /// (`check_scale_up_budget`) only counted ALREADY-active
1768 /// shards, so several `scale_up_provisioning` calls could each
1769 /// pass and then each `activate()` unconditionally bumped
1770 /// `active_count` past the cap.
1771 ///
1772 /// Setup: at the budget edge, allocate two Provisioning shards
1773 /// in sequence (both pass the gate because `active_count`
1774 /// hasn't moved). The first `activate()` fills the budget;
1775 /// the second must surface `AtMaxShards` rather than overflow.
1776 #[test]
1777 fn activate_rejects_when_active_count_would_exceed_max_shards() {
1778 let policy = ScalingPolicy {
1779 min_shards: 1,
1780 max_shards: 4,
1781 cooldown: Duration::from_nanos(1),
1782 ..Default::default()
1783 };
1784 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
1785 // active_count = 3, max = 4. Allocate TWO Provisioning
1786 // shards; both pass the budget gate (it sees active_count=3).
1787 let ids_a = mapper.scale_up_provisioning(1).unwrap();
1788 // The 1ns cooldown elapses on every realistic scheduler
1789 // tick; if we lose that race, retry. Release-mode and
1790 // some debug-build paths occasionally finish the first
1791 // allocation in <1ns of wall time.
1792 let ids_b = loop {
1793 match mapper.scale_up_provisioning(1) {
1794 Ok(v) => break v,
1795 Err(ScalingError::InCooldown) => {
1796 std::thread::sleep(Duration::from_nanos(50));
1797 }
1798 Err(e) => panic!("unexpected error from scale_up_provisioning: {:?}", e),
1799 }
1800 };
1801 assert_eq!(ids_a.len(), 1);
1802 assert_eq!(ids_b.len(), 1);
1803
1804 // First activate succeeds (active_count goes 3 → 4 = max).
1805 mapper
1806 .activate(ids_a[0])
1807 .expect("first activate must succeed");
1808 assert_eq!(mapper.active_shard_count(), 4);
1809
1810 // Second activate must REFUSE — it would push past max.
1811 let err = mapper
1812 .activate(ids_b[0])
1813 .expect_err("second activate must reject");
1814 assert!(
1815 matches!(err, ScalingError::AtMaxShards),
1816 "expected AtMaxShards, got {:?}",
1817 err
1818 );
1819 // active_count must still be at the cap, not over it.
1820 assert_eq!(mapper.active_shard_count(), 4);
1821 }
1822
1823 /// Pin: under contention, the active_count never transiently
1824 /// exceeds `max_shards`. Pre-fix `activate` released the
1825 /// shards write-lock BEFORE the `fetch_add(1)`, so two
1826 /// concurrent activators could each pass the budget gate
1827 /// (both reading the pre-bump count) and both bump,
1828 /// overshooting the cap by 1 at any observation point.
1829 #[test]
1830 fn concurrent_activate_never_exceeds_max_shards() {
1831 use std::sync::Arc;
1832 use std::sync::Barrier;
1833 use std::thread;
1834
1835 const ITERATIONS: usize = 200;
1836
1837 for iter in 0..ITERATIONS {
1838 let policy = ScalingPolicy {
1839 min_shards: 1,
1840 max_shards: 4,
1841 cooldown: Duration::from_nanos(1),
1842 ..Default::default()
1843 };
1844 // Start at active=3, allocate two Provisioning ids so
1845 // both threads have a candidate to activate. With the
1846 // pre-fix race: thread A loads count=3, validates,
1847 // sets state Active, drops lock; thread B loads count
1848 // (still 3), validates, sets state Active, drops lock;
1849 // A bumps → 4; B bumps → 5. Post-fix B observes
1850 // count=4 inside its lock and rejects with
1851 // AtMaxShards.
1852 let mapper = Arc::new(ShardMapper::new(3, 1024, policy).unwrap());
1853 let ids_a = mapper.scale_up_provisioning(1).unwrap();
1854 // The 1ns cooldown elapses on every realistic
1855 // scheduler tick; if we lose that race, retry once
1856 // (release-mode iterations occasionally finish the
1857 // first allocation in <1ns of wall time).
1858 let ids_b = loop {
1859 match mapper.scale_up_provisioning(1) {
1860 Ok(v) => break v,
1861 Err(ScalingError::InCooldown) => {
1862 std::thread::sleep(Duration::from_micros(10));
1863 continue;
1864 }
1865 Err(e) => panic!("unexpected error: {:?}", e),
1866 }
1867 };
1868 assert_eq!(ids_a.len(), 1);
1869 assert_eq!(ids_b.len(), 1);
1870
1871 let barrier = Arc::new(Barrier::new(2));
1872 let m1 = mapper.clone();
1873 let m2 = mapper.clone();
1874 let b1 = barrier.clone();
1875 let b2 = barrier.clone();
1876 let id_a = ids_a[0];
1877 let id_b = ids_b[0];
1878
1879 let h1 = thread::spawn(move || {
1880 b1.wait();
1881 m1.activate(id_a)
1882 });
1883 let h2 = thread::spawn(move || {
1884 b2.wait();
1885 m2.activate(id_b)
1886 });
1887 let r1 = h1.join().expect("thread A panicked");
1888 let r2 = h2.join().expect("thread B panicked");
1889
1890 // Exactly one must succeed and one must reject with
1891 // AtMaxShards.
1892 let ok_count = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
1893 let at_max_count = [&r1, &r2]
1894 .iter()
1895 .filter(|r| matches!(r, Err(ScalingError::AtMaxShards)))
1896 .count();
1897 assert_eq!(
1898 ok_count, 1,
1899 "iter {}: expected exactly one Ok, got r1={:?} r2={:?}",
1900 iter, r1, r2
1901 );
1902 assert_eq!(
1903 at_max_count, 1,
1904 "iter {}: expected exactly one AtMaxShards, got r1={:?} r2={:?}",
1905 iter, r1, r2
1906 );
1907
1908 // active_count must not exceed max_shards at any
1909 // observation point.
1910 assert!(
1911 mapper.active_shard_count() <= 4,
1912 "iter {}: active_count={} exceeded max_shards=4",
1913 iter,
1914 mapper.active_shard_count(),
1915 );
1916 }
1917 }
1918
1919 /// Two concurrent `scale_up(1)` calls must never both succeed
1920 /// inside a single cooldown window. Before the fix, the
1921 /// cooldown check happened only under a read lock that was
1922 /// released *before* the mutating write lock, so two threads
1923 /// could both observe `last_scaling=None` (or stale), both
1924 /// acquire the write lock in turn, and both succeed — racing
1925 /// past the cooldown floor and (on a max-shard-bounded
1926 /// scenario) potentially the `max_shards` cap.
1927 ///
1928 /// Pin: across `ITERATIONS` rounds of two-thread races, every
1929 /// iteration sees exactly one success and one `InCooldown`.
1930 #[test]
1931 fn cooldown_is_enforced_under_concurrent_scale_up() {
1932 use std::sync::Arc;
1933 use std::sync::Barrier;
1934 use std::thread;
1935
1936 const ITERATIONS: usize = 1_000;
1937
1938 for iter in 0..ITERATIONS {
1939 let policy = ScalingPolicy {
1940 min_shards: 1,
1941 max_shards: 16,
1942 // Large cooldown so a single iteration can't pass
1943 // the floor between the two threads' calls.
1944 cooldown: Duration::from_secs(60),
1945 ..Default::default()
1946 };
1947 let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
1948 let barrier = Arc::new(Barrier::new(2));
1949
1950 let m1 = mapper.clone();
1951 let b1 = barrier.clone();
1952 let m2 = mapper.clone();
1953 let b2 = barrier.clone();
1954
1955 let h1 = thread::spawn(move || {
1956 b1.wait();
1957 m1.scale_up(1)
1958 });
1959 let h2 = thread::spawn(move || {
1960 b2.wait();
1961 m2.scale_up(1)
1962 });
1963
1964 let r1 = h1.join().unwrap();
1965 let r2 = h2.join().unwrap();
1966
1967 let oks = [&r1, &r2].iter().filter(|r| r.is_ok()).count();
1968 let cooldowns = [&r1, &r2]
1969 .iter()
1970 .filter(|r| matches!(r, Err(ScalingError::InCooldown)))
1971 .count();
1972
1973 assert_eq!(
1974 oks, 1,
1975 "iter {iter}: expected exactly one Ok, got r1={r1:?}, r2={r2:?}"
1976 );
1977 assert_eq!(
1978 cooldowns, 1,
1979 "iter {iter}: expected exactly one InCooldown, got r1={r1:?}, r2={r2:?}"
1980 );
1981 assert_eq!(
1982 mapper.active_shard_count(),
1983 3,
1984 "iter {iter}: cooldown violated — both calls mutated state (shard count {})",
1985 mapper.active_shard_count()
1986 );
1987 }
1988 }
1989
1990 #[test]
1991 fn test_scale_up_overflow_protection() {
1992 // Create mapper with a high starting shard ID to test overflow
1993 // protection. The monotonic id allocator advances by 1 per
1994 // shard regardless of which shards have been removed, so we
1995 // bump `next_shard_id` directly to simulate a near-`u16::MAX`
1996 // state.
1997 let policy = ScalingPolicy {
1998 max_shards: u16::MAX,
1999 ..Default::default()
2000 };
2001 let mapper = ShardMapper::new(1, 1024, policy).unwrap();
2002
2003 // Position the allocator so the next id is u16::MAX - 1.
2004 // Trying to allocate 3 ids would need {MAX-1, MAX, MAX+1};
2005 // the last is unrepresentable in `u16`, so scale_up rejects.
2006 mapper
2007 .next_shard_id
2008 .store(u16::MAX - 1, AtomicOrdering::Relaxed);
2009
2010 let result = mapper.scale_up(3);
2011 assert!(matches!(result, Err(ScalingError::AtMaxShards)));
2012
2013 // Adding 1 shard should still work (id = MAX - 1).
2014 let result = mapper.scale_up(1);
2015 assert!(result.is_ok());
2016 }
2017
2018 #[test]
2019 fn test_evaluate_scaling_auto_scale_disabled() {
2020 let policy = ScalingPolicy {
2021 auto_scale: false,
2022 ..Default::default()
2023 };
2024 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2025
2026 let decision = mapper.evaluate_scaling();
2027 assert!(matches!(decision, ScalingDecision::None));
2028 }
2029
2030 #[test]
2031 fn test_evaluate_scaling_in_cooldown() {
2032 let policy = ScalingPolicy {
2033 cooldown: Duration::from_secs(60),
2034 ..Default::default()
2035 };
2036 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2037
2038 // Trigger a scaling operation to start cooldown
2039 *mapper.last_scaling.write() = Some(Instant::now());
2040
2041 let decision = mapper.evaluate_scaling();
2042 assert!(matches!(decision, ScalingDecision::None));
2043 }
2044
2045 #[test]
2046 fn test_evaluate_scaling_scale_up_on_high_fill_ratio() {
2047 let policy = ScalingPolicy {
2048 fill_ratio_threshold: 0.7,
2049 max_shards: 8,
2050 cooldown: Duration::from_nanos(1),
2051 ..Default::default()
2052 };
2053 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2054
2055 // Set high fill ratio on majority of shards
2056 {
2057 let mut shards = mapper.shards.write();
2058 for shard in shards.iter_mut() {
2059 shard.last_metrics.fill_ratio = 0.9; // Above threshold
2060 }
2061 }
2062
2063 let decision = mapper.evaluate_scaling();
2064 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2065 }
2066
2067 #[test]
2068 fn test_evaluate_scaling_scale_up_on_high_latency() {
2069 let policy = ScalingPolicy {
2070 push_latency_threshold_ns: 10,
2071 max_shards: 8,
2072 cooldown: Duration::from_nanos(1),
2073 ..Default::default()
2074 };
2075 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2076
2077 // Set high push latency on majority of shards
2078 {
2079 let mut shards = mapper.shards.write();
2080 for shard in shards.iter_mut() {
2081 shard.last_metrics.avg_push_latency_ns = 100; // Above threshold
2082 }
2083 }
2084
2085 let decision = mapper.evaluate_scaling();
2086 assert!(matches!(decision, ScalingDecision::ScaleUp(_)));
2087 }
2088
2089 #[test]
2090 fn test_evaluate_scaling_scale_down_on_underutilized() {
2091 let policy = ScalingPolicy {
2092 underutilized_threshold: 0.2,
2093 min_shards: 2,
2094 cooldown: Duration::from_nanos(1),
2095 ..Default::default()
2096 };
2097 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2098
2099 // Set low fill ratio and zero event rate on majority of shards
2100 {
2101 let mut shards = mapper.shards.write();
2102 for shard in shards.iter_mut() {
2103 shard.last_metrics.fill_ratio = 0.05; // Below threshold
2104 shard.last_metrics.event_rate = 0;
2105 }
2106 }
2107
2108 let decision = mapper.evaluate_scaling();
2109 assert!(matches!(decision, ScalingDecision::ScaleDown(_)));
2110 }
2111
2112 /// Regression: a freshly-activated shard must NOT
2113 /// immediately count toward the underutilized tally on the
2114 /// next `evaluate_scaling`. Pre-fix the new shard's
2115 /// `last_metrics` was the `ShardMetrics::new(id)` placeholder
2116 /// (`fill_ratio = 0.0, event_rate = 0`), which matched the
2117 /// underutilized trigger immediately — oscillating the
2118 /// system: scale-up → next tick scale-down → next tick
2119 /// scale-up.
2120 ///
2121 /// Post-fix `MappedShard.activated_at` is stamped at the
2122 /// Provisioning → Active transition (and at scale-up
2123 /// construction for `Active`-from-create shards), and
2124 /// `evaluate_scaling` skips shards within `policy.cooldown`
2125 /// of activation.
2126 #[test]
2127 fn freshly_added_shard_skipped_from_evaluate_scaling_warmup() {
2128 let policy = ScalingPolicy {
2129 underutilized_threshold: 0.2,
2130 min_shards: 1,
2131 // Long cooldown so the warmup window stays open
2132 // throughout the test.
2133 cooldown: Duration::from_secs(60),
2134 ..Default::default()
2135 };
2136 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2137
2138 // Direct manipulation of the shard list: pin two boot
2139 // shards as underutilized (boot stamps put them outside
2140 // the warmup window), and inject one freshly-activated
2141 // shard with `activated_at = now()` whose placeholder
2142 // metrics ALSO trigger the underutilized predicate.
2143 // Without the warmup skip, the count would be 3 of 3
2144 // shards underutilized → scale-down. WITH the warmup
2145 // skip, only the 2 boot shards count, and 2 of 3 still
2146 // exceeds the 3/2 = 1 majority — scale-down still fires
2147 // (driven by the boot shards), but the decisive property
2148 // is that the fresh shard is correctly excluded.
2149 {
2150 let mut shards = mapper.shards.write();
2151 for shard in shards.iter_mut() {
2152 shard.last_metrics.fill_ratio = 0.05;
2153 shard.last_metrics.event_rate = 0;
2154 }
2155 // The third shard is "fresh": stamp activated_at to
2156 // now, simulating a just-activated shard.
2157 shards[2].activated_at = Instant::now();
2158 }
2159
2160 // The fresh shard must satisfy the warmup predicate.
2161 let now = Instant::now();
2162 let warmup_excluded: Vec<u16> = mapper
2163 .shards
2164 .read()
2165 .iter()
2166 .filter(|s| now.duration_since(s.activated_at) < mapper.policy.cooldown)
2167 .map(|s| s.id)
2168 .collect();
2169 assert_eq!(
2170 warmup_excluded,
2171 vec![2u16],
2172 "regression: only shard id 2 (just-stamped) should be \
2173 within the warmup window; boot shards are stamped \
2174 1 hour in the past"
2175 );
2176
2177 // And evaluate_scaling must still produce ScaleDown
2178 // (driven by the 2 boot shards) — the fresh shard's
2179 // placeholder doesn't get to vote.
2180 let decision = mapper.evaluate_scaling();
2181 assert!(
2182 matches!(decision, ScalingDecision::ScaleDown(_)),
2183 "scale-down still fires from the boot shards' real \
2184 underutilization, but driven by 2 of 3 not 3 of 3 — \
2185 got {:?}",
2186 decision,
2187 );
2188 }
2189
2190 #[test]
2191 fn test_evaluate_scaling_no_scale_up_at_max() {
2192 let policy = ScalingPolicy {
2193 fill_ratio_threshold: 0.7,
2194 max_shards: 4, // Already at max
2195 cooldown: Duration::from_nanos(1),
2196 ..Default::default()
2197 };
2198 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2199
2200 // Set high fill ratio
2201 {
2202 let mut shards = mapper.shards.write();
2203 for shard in shards.iter_mut() {
2204 shard.last_metrics.fill_ratio = 0.9;
2205 }
2206 }
2207
2208 let decision = mapper.evaluate_scaling();
2209 assert!(matches!(decision, ScalingDecision::None));
2210 }
2211
2212 #[test]
2213 fn test_evaluate_scaling_no_scale_down_at_min() {
2214 let policy = ScalingPolicy {
2215 underutilized_threshold: 0.2,
2216 min_shards: 4, // Already at min
2217 cooldown: Duration::from_nanos(1),
2218 ..Default::default()
2219 };
2220 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2221
2222 // Set underutilized metrics
2223 {
2224 let mut shards = mapper.shards.write();
2225 for shard in shards.iter_mut() {
2226 shard.last_metrics.fill_ratio = 0.05;
2227 shard.last_metrics.event_rate = 0;
2228 }
2229 }
2230
2231 let decision = mapper.evaluate_scaling();
2232 assert!(matches!(decision, ScalingDecision::None));
2233 }
2234
2235 #[test]
2236 fn test_evaluate_scaling_ignores_draining_shards() {
2237 let policy = ScalingPolicy {
2238 fill_ratio_threshold: 0.7,
2239 max_shards: 8,
2240 cooldown: Duration::from_nanos(1),
2241 ..Default::default()
2242 };
2243 let mapper = ShardMapper::new(4, 1024, policy).unwrap();
2244
2245 // Set high fill ratio but mark shards as draining
2246 {
2247 let mut shards = mapper.shards.write();
2248 for shard in shards.iter_mut() {
2249 shard.last_metrics.fill_ratio = 0.9;
2250 shard.state = ShardState::Draining;
2251 }
2252 }
2253
2254 // Should not scale up since draining shards are ignored
2255 let decision = mapper.evaluate_scaling();
2256 assert!(matches!(decision, ScalingDecision::None));
2257 }
2258
2259 #[test]
2260 fn test_shard_metrics_new() {
2261 let metrics = ShardMetrics::new(5);
2262 assert_eq!(metrics.shard_id, 5);
2263 assert_eq!(metrics.fill_ratio, 0.0);
2264 assert_eq!(metrics.event_rate, 0);
2265 assert!(!metrics.draining);
2266 }
2267
2268 #[test]
2269 fn test_shard_metrics_compute_weight() {
2270 let mut metrics = ShardMetrics::new(0);
2271 metrics.fill_ratio = 0.5;
2272 metrics.avg_push_latency_ns = 100;
2273 metrics.event_rate = 1_000_000;
2274
2275 metrics.compute_weight();
2276 assert!(metrics.weight > 0.0);
2277 }
2278
2279 #[test]
2280 fn test_shard_metrics_draining_max_weight() {
2281 let mut metrics = ShardMetrics::new(0);
2282 metrics.draining = true;
2283 metrics.compute_weight();
2284 assert_eq!(metrics.weight, f64::MAX);
2285 }
2286
2287 #[test]
2288 fn test_scaling_decision_debug() {
2289 let none = ScalingDecision::None;
2290 let up = ScalingDecision::ScaleUp(2);
2291 let down = ScalingDecision::ScaleDown(1);
2292
2293 assert!(format!("{:?}", none).contains("None"));
2294 assert!(format!("{:?}", up).contains("ScaleUp"));
2295 assert!(format!("{:?}", down).contains("ScaleDown"));
2296 }
2297
2298 /// Regression: BUG_REPORT.md #46 — `scale_up_provisioning`
2299 /// allocates a shard but `select_shard` must NOT route to it
2300 /// until `activate` has been called. This is the load-bearing
2301 /// invariant that lets `EventBus::add_shard_internal` wire up
2302 /// drain workers before producers can land in the new ring
2303 /// buffer.
2304 #[test]
2305 fn provisioning_shard_is_not_selectable_until_activated() {
2306 let policy = ScalingPolicy {
2307 min_shards: 1,
2308 max_shards: 16,
2309 cooldown: Duration::from_nanos(1),
2310 ..Default::default()
2311 };
2312 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2313
2314 // Allocate a provisioning shard. Existing ids are 0,1; new
2315 // is 2.
2316 let new_ids = mapper.scale_up_provisioning(1).unwrap();
2317 assert_eq!(new_ids, vec![2]);
2318
2319 // The provisioning shard must NOT appear in active accounting.
2320 assert_eq!(mapper.active_shard_count(), 2);
2321 assert_eq!(mapper.shard_state(2), Some(ShardState::Provisioning));
2322
2323 // Across many hashes, `select_shard` must never pick id 2.
2324 // Spread the input hashes across the u64 range so the
2325 // unbiased Lemire-style mapping actually
2326 // distributes — sequential small ids would all map to
2327 // index 0 because `(small * len) >> 64 = 0`. Production
2328 // callers pass `xxh3_64`-hashed event payloads, which
2329 // are uniform u64s; this scaling mirrors that.
2330 let mut seen: std::collections::HashSet<u16> = std::collections::HashSet::new();
2331 let stride = u64::MAX / 10_000;
2332 for i in 0u64..10_000 {
2333 seen.insert(mapper.select_shard(i.wrapping_mul(stride)));
2334 }
2335 assert!(
2336 !seen.contains(&2),
2337 "provisioning shard 2 was selected — \
2338 producers would push into a ring buffer with no consumer (#46)"
2339 );
2340 assert!(seen == [0, 1].into_iter().collect());
2341
2342 // After `activate`, the shard is selectable.
2343 mapper.activate(2).unwrap();
2344 assert_eq!(mapper.shard_state(2), Some(ShardState::Active));
2345 assert_eq!(mapper.active_shard_count(), 3);
2346
2347 let mut seen_after: std::collections::HashSet<u16> = std::collections::HashSet::new();
2348 let stride = u64::MAX / 10_000;
2349 for i in 0u64..10_000 {
2350 seen_after.insert(mapper.select_shard(i.wrapping_mul(stride)));
2351 }
2352 assert!(
2353 seen_after.contains(&2),
2354 "after activate, shard 2 should be a valid select_shard target"
2355 );
2356 }
2357
2358 /// Regression: BUG_REPORT.md #51 — when no `Active` shard exists
2359 /// (e.g. all shards are Draining or Provisioning), `select_shard`
2360 /// must NOT fall back to a Draining shard. Pushing into a
2361 /// draining shard increments `pushes_since_drain_start` and
2362 /// blocks finalization indefinitely.
2363 #[test]
2364 fn select_shard_does_not_fall_back_to_draining() {
2365 let policy = ScalingPolicy {
2366 min_shards: 0,
2367 max_shards: 8,
2368 cooldown: Duration::from_nanos(1),
2369 ..Default::default()
2370 };
2371 // Force min_shards = 0 via direct construction so we can drain
2372 // every shard.
2373 let mut policy = policy;
2374 policy.min_shards = 1;
2375 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2376
2377 // Force every shard into Draining state directly. We bypass
2378 // `scale_down`'s min_shards floor by mutating the test-only
2379 // accessor; this models a state that can otherwise be
2380 // reached by `drain_specific` calls.
2381 {
2382 let mut shards = mapper.shards.write();
2383 for s in shards.iter_mut() {
2384 s.state = ShardState::Draining;
2385 s.metrics.set_draining(true);
2386 }
2387 }
2388
2389 // The fallback must return the OOB sentinel `u16::MAX` rather
2390 // than a draining shard id (0 or 1). The upstream
2391 // `resolve_idx` path will see no match for `u16::MAX` and
2392 // surface as `Unrouted` (#44), which is the correct signal:
2393 // "no destination, do not push."
2394 for hash in 0u64..1_000 {
2395 let picked = mapper.select_shard(hash);
2396 assert_eq!(
2397 picked,
2398 u16::MAX,
2399 "fallback returned a draining shard ({}); pushes there would \
2400 deadlock finalize_draining (#51)",
2401 picked
2402 );
2403 }
2404 }
2405
2406 /// Regression: BUG_REPORT.md #32 — `scale_up(0)` previously
2407 /// bumped the cooldown timestamp and could spuriously fail at
2408 /// `u16::MAX` even though zero ids were being allocated.
2409 #[test]
2410 fn scale_up_zero_is_a_noop() {
2411 let policy = ScalingPolicy {
2412 cooldown: Duration::from_secs(60),
2413 ..Default::default()
2414 };
2415 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2416 // Pretend we just scaled — cooldown is active.
2417 *mapper.last_scaling.write() = Some(Instant::now());
2418
2419 // scale_up(0) must not return InCooldown and must not bump
2420 // last_scaling.
2421 let before_ts = *mapper.last_scaling.read();
2422 let r = mapper.scale_up(0);
2423 assert!(
2424 r.is_ok(),
2425 "scale_up(0) should succeed as a no-op, got {r:?}"
2426 );
2427 assert_eq!(r.unwrap().len(), 0);
2428 let after_ts = *mapper.last_scaling.read();
2429 assert_eq!(before_ts, after_ts, "scale_up(0) bumped cooldown timestamp");
2430
2431 // Also: position the allocator at u16::MAX and verify a
2432 // count==0 call doesn't trip the sentinel check.
2433 mapper
2434 .next_shard_id
2435 .store(u16::MAX, AtomicOrdering::Relaxed);
2436 assert!(mapper.scale_up(0).is_ok());
2437 }
2438
2439 /// Regression: `drain_specific` must bump `last_scaling` so
2440 /// a subsequent `scale_up` is gated by the cooldown floor.
2441 /// Pre-fix `scale_down` wrote `last_scaling` but
2442 /// `drain_specific` did not, so the sequence
2443 /// `drain_specific(id) → scale_up(N)` bypassed the cooldown
2444 /// — even though both decrement `active_count` and so
2445 /// should be symmetric from the budget-math perspective.
2446 #[test]
2447 fn drain_specific_bumps_last_scaling_so_scale_up_respects_cooldown() {
2448 let policy = ScalingPolicy {
2449 min_shards: 1,
2450 max_shards: 8,
2451 // A cooldown long enough that `Instant::now()` won't
2452 // accidentally elapse it during the test, but short
2453 // enough not to slow CI.
2454 cooldown: Duration::from_secs(60),
2455 ..Default::default()
2456 };
2457 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2458
2459 // Pre-condition: no prior scaling action.
2460 assert!(mapper.last_scaling.read().is_none());
2461
2462 let before = Instant::now();
2463 mapper.drain_specific(0).unwrap();
2464 let after_ts = mapper
2465 .last_scaling
2466 .read()
2467 .expect("drain_specific must record a `last_scaling` timestamp");
2468 // Sanity: the recorded timestamp is in the test window.
2469 assert!(
2470 after_ts >= before,
2471 "last_scaling must be bumped to a current Instant (got {:?}, before was {:?})",
2472 after_ts,
2473 before
2474 );
2475
2476 // The decisive sealed property: a follow-up scale_up
2477 // must trip the cooldown gate. Pre-fix this would have
2478 // succeeded immediately because `last_scaling` was never
2479 // written by `drain_specific`.
2480 let err = mapper
2481 .scale_up(1)
2482 .expect_err("scale_up immediately after drain_specific must hit cooldown");
2483 match err {
2484 ScalingError::InCooldown => {} // expected
2485 other => panic!("expected InCooldown after drain_specific, got {:?}", other),
2486 }
2487 }
2488
2489 /// Regression: BUG_REPORT.md #48 — `drain_specific` must
2490 /// transition the shard's `MappedShard.state` to `Draining` so
2491 /// that `select_shard` stops routing to it. The previous
2492 /// `drain_shard` only flipped a metrics atomic.
2493 #[test]
2494 fn drain_specific_takes_shard_out_of_select() {
2495 let policy = ScalingPolicy {
2496 min_shards: 1,
2497 max_shards: 8,
2498 cooldown: Duration::from_nanos(1),
2499 ..Default::default()
2500 };
2501 let mapper = ShardMapper::new(3, 1024, policy).unwrap();
2502
2503 // Drain shard 0.
2504 mapper.drain_specific(0).unwrap();
2505 assert_eq!(mapper.shard_state(0), Some(ShardState::Draining));
2506 assert_eq!(mapper.active_shard_count(), 2);
2507
2508 // Across many hashes, select_shard must not pick id 0.
2509 for hash in 0u64..10_000 {
2510 let picked = mapper.select_shard(hash);
2511 assert_ne!(
2512 picked, 0,
2513 "select_shard returned a Draining shard id 0 — \
2514 producer pushes there would block finalize_draining (#48)"
2515 );
2516 }
2517 }
2518
2519 /// Regression: BUG_REPORT.md #33 — `set_draining(true)` and a
2520 /// concurrent `record_push` race on `pushes_since_drain_start`.
2521 /// The race itself can't be eliminated without a CAS loop on
2522 /// the counter (which would penalize the hot path), so the
2523 /// best we can pin is: after the dust settles, the counter
2524 /// value is bounded — never larger than the number of pushes
2525 /// that genuinely overlapped the transition. This catches a
2526 /// regression where the store-zero stops happening, where the
2527 /// flag publish stops happening, or where future code adds
2528 /// drift that compounds the race across many transitions.
2529 #[test]
2530 fn set_draining_resets_counter_under_concurrent_pushes() {
2531 use std::sync::Barrier;
2532 use std::thread;
2533
2534 const ITERATIONS: usize = 200;
2535 const PUSHERS: usize = 4;
2536
2537 for _ in 0..ITERATIONS {
2538 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
2539
2540 // Sanity: counter starts at zero.
2541 assert_eq!(collector.pushes_since_drain_start(), 0);
2542
2543 // Pre-load with a noticeable amount of "before drain"
2544 // pushes so the reset has work to do.
2545 for _ in 0..50 {
2546 collector.record_push(1);
2547 }
2548 assert_eq!(collector.pushes_since_drain_start(), 50);
2549
2550 // Race: every pusher hammers `record_push` while one
2551 // thread calls `set_draining(true)`. After the barrier,
2552 // we want to observe that the counter ends up bounded
2553 // by PUSHERS (the number of pushes that genuinely
2554 // overlapped the transition) — and crucially NOT 50+,
2555 // which is what the buggy code with a missing reset
2556 // would leave behind.
2557 let barrier = Arc::new(Barrier::new(PUSHERS + 1));
2558 let mut handles = Vec::with_capacity(PUSHERS);
2559 for _ in 0..PUSHERS {
2560 let c = Arc::clone(&collector);
2561 let b = Arc::clone(&barrier);
2562 handles.push(thread::spawn(move || {
2563 b.wait();
2564 c.record_push(1);
2565 }));
2566 }
2567
2568 barrier.wait();
2569 collector.set_draining(true);
2570 for h in handles {
2571 h.join().unwrap();
2572 }
2573
2574 // After reset + at-most-PUSHERS racing pushes, the
2575 // counter is bounded. The strong guarantee we want is
2576 // "the 50 pre-drain pushes did NOT survive the reset."
2577 // Anything <= PUSHERS is acceptable — the race may
2578 // count any subset of the racing pushes.
2579 let final_count = collector.pushes_since_drain_start();
2580 assert!(
2581 final_count <= PUSHERS as u64,
2582 "set_draining reset is broken: counter is {} after reset, \
2583 expected at most {} (#33)",
2584 final_count,
2585 PUSHERS
2586 );
2587 }
2588 }
2589
2590 /// Regression: BUG_REPORT.md #49 — `finalize_draining` must
2591 /// drop the `shards` write lock before calling `on_shard_removed`,
2592 /// so a callback that re-enters the mapper (read methods like
2593 /// `shard_state`, `active_shard_ids`, etc.) does not deadlock.
2594 #[test]
2595 fn finalize_draining_does_not_deadlock_on_callback_reentry() {
2596 use std::sync::Mutex;
2597 let policy = ScalingPolicy {
2598 min_shards: 1,
2599 max_shards: 8,
2600 cooldown: Duration::from_nanos(1),
2601 ..Default::default()
2602 };
2603 let mapper = Arc::new(ShardMapper::new(2, 1024, policy).unwrap());
2604
2605 // Set a callback that re-enters the mapper. Before the fix
2606 // this acquires `shards.read()` while finalize_draining
2607 // still holds `shards.write()`, deadlocking on parking_lot's
2608 // non-recursive RwLock.
2609 type Observation = (u16, Option<ShardState>);
2610 let observed_states: Arc<Mutex<Vec<Observation>>> = Arc::new(Mutex::new(Vec::new()));
2611 {
2612 let mapper_for_cb = Arc::clone(&mapper);
2613 let observed = Arc::clone(&observed_states);
2614 mapper.set_on_shard_removed(move |id| {
2615 let st = mapper_for_cb.shard_state(id);
2616 observed.lock().unwrap().push((id, st));
2617 });
2618 }
2619
2620 // Drive a shard all the way to Stopped: drain it, mark its
2621 // metrics empty (current_len = 0), and drop drain_started
2622 // far enough back that the 100ms gate is satisfied.
2623 mapper.drain_specific(0).unwrap();
2624 {
2625 let mut shards = mapper.shards.write();
2626 let s = shards.iter_mut().find(|s| s.id == 0).unwrap();
2627 s.metrics.current_len.store(0, AtomicOrdering::Relaxed);
2628 s.metrics
2629 .pushes_since_drain_start
2630 .store(0, AtomicOrdering::Relaxed);
2631 // Backdate drain_started so the elapsed > 100ms gate trips.
2632 s.drain_started = Some(Instant::now() - Duration::from_secs(1));
2633 }
2634
2635 // The call below would deadlock with the bug present.
2636 let stopped = mapper.finalize_draining();
2637 assert_eq!(stopped, vec![0]);
2638
2639 // The callback must have run AND been able to read state
2640 // (proving the lock was released).
2641 let observed = observed_states.lock().unwrap().clone();
2642 assert_eq!(observed.len(), 1);
2643 assert_eq!(observed[0].0, 0);
2644 // State should be `Stopped` (set by finalize_draining before
2645 // the lock was dropped).
2646 assert_eq!(observed[0].1, Some(ShardState::Stopped));
2647 }
2648
2649 /// `activate` must signal whether a state transition
2650 /// actually occurred so callers can avoid double-counting.
2651 /// Pre-fix it returned `Result<(), _>`, so a caller that
2652 /// invoked `activate` twice on the same shard incremented its
2653 /// own external counter (e.g. `ShardManager::num_shards`)
2654 /// twice for one logical transition.
2655 #[test]
2656 fn activate_returns_true_on_transition_and_false_on_idempotent_call() {
2657 let policy = ScalingPolicy {
2658 min_shards: 1,
2659 max_shards: 16,
2660 cooldown: Duration::from_nanos(1),
2661 ..Default::default()
2662 };
2663 let mapper = ShardMapper::new(2, 1024, policy).unwrap();
2664
2665 // Newly-provisioned shard 2 → Active: real transition.
2666 let new_ids = mapper.scale_up_provisioning(1).unwrap();
2667 assert_eq!(new_ids, vec![2]);
2668 let first = mapper.activate(2).unwrap();
2669 assert!(
2670 first,
2671 "first activate on a Provisioning shard must return true"
2672 );
2673 assert_eq!(mapper.active_shard_count(), 3);
2674
2675 // Second activate on the same already-Active shard:
2676 // idempotent, no transition.
2677 let second = mapper.activate(2).unwrap();
2678 assert!(
2679 !second,
2680 "activate on an already-Active shard must return false; \
2681 pre-fix this returned Ok(()) and the caller couldn't tell"
2682 );
2683 // active_count must NOT have moved.
2684 assert_eq!(
2685 mapper.active_shard_count(),
2686 3,
2687 "idempotent activate must not bump active_count"
2688 );
2689 }
2690}