Skip to main content

net/shard/
mod.rs

1//! Shard management for parallel event ingestion.
2//!
3//! The shard module provides:
4//! - Lock-free ring buffers for high-throughput event queuing
5//! - Per-shard timestamp generation (no cross-shard contention)
6//! - Batch assembly with adaptive sizing
7//! - Shard manager for coordinating multiple shards
8//! - Dynamic shard scaling with weighted producer routing
9
10mod batch;
11mod mapper;
12mod ring_buffer;
13
14pub use batch::{AdaptiveBatcher, BatchWorker};
15pub use mapper::{
16    ScalingDecision, ScalingError, ShardMapper, ShardMetrics, ShardMetricsCollector, ShardState,
17};
18// `RingBuffer` and `BufferFullError` are intentionally NOT re-exported.
19// External callers go through `EventBus` / `ShardManager`, which
20// uphold the SPSC contract via `Mutex<Shard>`. Exposing the raw ring
21// buffer publicly was a silent-UB footgun — anyone wrapping it in an
22// `Arc` and pushing from two threads got data corruption with no
23// compile-time signal. `BufferFullError` is not
24// re-exported here either: callers see it as `IngestionError::Backpressure`.
25pub(crate) use ring_buffer::RingBuffer;
26
27// Re-export ScalingPolicy from config for convenience
28pub use crate::config::ScalingPolicy;
29
30use bytes::Bytes;
31
32use crate::config::BackpressureMode;
33use crate::error::IngestionError;
34use crate::event::{InternalEvent, RawEvent};
35use crate::timestamp::TimestampGenerator;
36
37use serde_json::Value as JsonValue;
38use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
39use std::sync::Arc;
40use std::time::Instant;
41
42/// Atomic counters for a single shard. Kept outside `Shard` as `Arc`s
43/// so `ShardManager::stats()` can aggregate them without locking each
44/// shard's mutex.
45#[derive(Debug, Default)]
46pub struct ShardCounters {
47    /// Total events ingested into this shard.
48    pub events_ingested: AtomicU64,
49    /// Events dropped due to backpressure.
50    pub events_dropped: AtomicU64,
51    /// Batches successfully dispatched to the adapter.
52    pub batches_dispatched: AtomicU64,
53}
54
55/// Statistics for a single shard (snapshot).
56#[derive(Debug, Default, Clone, Copy)]
57pub struct ShardStats {
58    /// Total events ingested.
59    pub events_ingested: u64,
60    /// Events dropped due to backpressure.
61    pub events_dropped: u64,
62    /// Batches dispatched to adapter.
63    pub batches_dispatched: u64,
64    /// Events that arrived at `ingest_raw_batch` but had no resolvable
65    /// shard (e.g. the routing table was rebuilt mid-dispatch and the
66    /// hashed shard id is no longer present). These cannot be
67    /// attributed to a per-shard counter, so they are tracked at the
68    /// `ShardManager` level and surfaced through aggregated `stats()`.
69    pub events_unrouted: u64,
70}
71
72impl ShardCounters {
73    /// Load a consistent snapshot of the counters.
74    ///
75    /// `events_unrouted` is left at zero here — it is a manager-level
76    /// counter, not a per-shard one. `ShardManager::stats()` fills it
77    /// in after summing per-shard fields.
78    #[inline]
79    pub fn snapshot(&self) -> ShardStats {
80        ShardStats {
81            events_ingested: self.events_ingested.load(AtomicOrdering::Relaxed),
82            events_dropped: self.events_dropped.load(AtomicOrdering::Relaxed),
83            batches_dispatched: self.batches_dispatched.load(AtomicOrdering::Relaxed),
84            events_unrouted: 0,
85        }
86    }
87}
88
89/// A single shard with its own ring buffer and timestamp generator.
90pub struct Shard {
91    /// Shard identifier.
92    pub id: u16,
93    /// Ring buffer for event queuing.
94    ring_buffer: RingBuffer<InternalEvent>,
95    /// Shard-local timestamp generator (no contention).
96    timestamp_gen: TimestampGenerator,
97    /// Shared atomic counters (also referenced from `ShardTable` for
98    /// lock-free aggregation).
99    counters: Arc<ShardCounters>,
100    /// Optional metrics collector for dynamic scaling.
101    metrics_collector: Option<Arc<ShardMetricsCollector>>,
102    /// Ring buffer capacity (for metrics).
103    capacity: usize,
104}
105
106impl Shard {
107    /// Create a new shard.
108    pub fn new(id: u16, capacity: usize) -> Self {
109        Self {
110            id,
111            ring_buffer: RingBuffer::new(capacity),
112            timestamp_gen: TimestampGenerator::new(),
113            counters: Arc::new(ShardCounters::default()),
114            metrics_collector: None,
115            capacity,
116        }
117    }
118
119    /// Create a new shard with a metrics collector for dynamic scaling.
120    pub fn with_metrics(id: u16, capacity: usize, metrics: Arc<ShardMetricsCollector>) -> Self {
121        Self {
122            id,
123            ring_buffer: RingBuffer::new(capacity),
124            timestamp_gen: TimestampGenerator::new(),
125            counters: Arc::new(ShardCounters::default()),
126            metrics_collector: Some(metrics),
127            capacity,
128        }
129    }
130
131    /// Clone the atomic counter handle (for lock-free aggregation).
132    #[inline]
133    pub fn counters(&self) -> Arc<ShardCounters> {
134        self.counters.clone()
135    }
136
137    /// Set the metrics collector.
138    pub fn set_metrics_collector(&mut self, metrics: Arc<ShardMetricsCollector>) {
139        self.metrics_collector = Some(metrics);
140    }
141
142    /// Try to push a raw event (pre-serialized bytes) into the shard's ring buffer.
143    /// Returns the assigned insertion timestamp on success.
144    ///
145    /// This is the fastest ingestion path - no serialization or hashing needed.
146    #[inline]
147    pub fn try_push_raw(&mut self, raw: Bytes) -> Result<u64, IngestionError> {
148        let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
149        let ts = self.timestamp_gen.next();
150        let event = InternalEvent::new(raw, ts, self.id);
151
152        match self.ring_buffer.try_push(event) {
153            Ok(()) => {
154                self.counters
155                    .events_ingested
156                    .fetch_add(1, AtomicOrdering::Relaxed);
157                if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
158                    collector.record_push(start.elapsed().as_nanos() as u64);
159                    collector.record_buffer_len(self.ring_buffer.len());
160                }
161                Ok(ts)
162            }
163            Err(_) => {
164                self.counters
165                    .events_dropped
166                    .fetch_add(1, AtomicOrdering::Relaxed);
167                Err(IngestionError::Backpressure)
168            }
169        }
170    }
171
172    /// Try to push a JSON value into the shard's ring buffer.
173    /// Returns the assigned insertion timestamp on success.
174    ///
175    /// This serializes the value once before storing.
176    #[inline]
177    pub fn try_push(&mut self, raw: JsonValue) -> Result<u64, IngestionError> {
178        let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
179        let ts = self.timestamp_gen.next();
180        let event = InternalEvent::from_value(raw, ts, self.id);
181
182        match self.ring_buffer.try_push(event) {
183            Ok(()) => {
184                self.counters
185                    .events_ingested
186                    .fetch_add(1, AtomicOrdering::Relaxed);
187                if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
188                    collector.record_push(start.elapsed().as_nanos() as u64);
189                    collector.record_buffer_len(self.ring_buffer.len());
190                }
191                Ok(ts)
192            }
193            Err(_) => {
194                self.counters
195                    .events_dropped
196                    .fetch_add(1, AtomicOrdering::Relaxed);
197                Err(IngestionError::Backpressure)
198            }
199        }
200    }
201
202    /// Pop a batch of events from the ring buffer.
203    ///
204    /// Allocates a fresh `Vec`. Prefer [`pop_batch_into`] in drain
205    /// loops where the per-cycle `Vec` allocation should happen
206    /// outside the shard mutex.
207    ///
208    /// [`pop_batch_into`]: Self::pop_batch_into
209    #[inline]
210    pub fn pop_batch(&mut self, max: usize) -> Vec<InternalEvent> {
211        let out = self.ring_buffer.pop_batch(max);
212        if let Some(collector) = &self.metrics_collector {
213            collector.record_buffer_len(self.ring_buffer.len());
214        }
215        out
216    }
217
218    /// Pop a batch of events into a caller-owned buffer.
219    ///
220    /// Append semantics: does **not** clear `dst`; reserves
221    /// `count` slots and pushes drained elements onto the end.
222    /// Returns the number drained this call. Use this in
223    /// steady-state drain loops where the caller keeps a scratch
224    /// `Vec` across cycles, so the per-cycle allocation moves out
225    /// of the consumer's critical section.
226    #[inline]
227    pub fn pop_batch_into(&mut self, dst: &mut Vec<InternalEvent>, max: usize) -> usize {
228        let n = self.ring_buffer.pop_batch_into(dst, max);
229        if let Some(collector) = &self.metrics_collector {
230            collector.record_buffer_len(self.ring_buffer.len());
231        }
232        n
233    }
234
235    /// Try to pop a single event from the ring buffer.
236    #[inline]
237    pub fn try_pop(&mut self) -> Option<InternalEvent> {
238        let out = self.ring_buffer.try_pop();
239        if let Some(collector) = &self.metrics_collector {
240            collector.record_buffer_len(self.ring_buffer.len());
241        }
242        out
243    }
244
245    /// Producer-side eviction of the oldest event.
246    ///
247    /// Used by `BackpressureMode::DropOldest` to make room for a
248    /// new push when the buffer is full. Bypasses the ring buffer's
249    /// consumer-thread tracking (the producer thread is calling
250    /// what is normally a consumer-side operation). Safe because
251    /// the outer shard mutex serializes this against any concurrent
252    /// `try_pop` from the legitimate consumer (the batch worker).
253    #[inline]
254    pub(crate) fn evict_oldest(&mut self) -> Option<InternalEvent> {
255        self.ring_buffer.evict_oldest()
256    }
257
258    /// Get the current buffer length.
259    #[inline]
260    pub fn len(&self) -> usize {
261        self.ring_buffer.len()
262    }
263
264    /// Check if the buffer is empty.
265    #[inline]
266    pub fn is_empty(&self) -> bool {
267        self.ring_buffer.is_empty()
268    }
269
270    /// Check if the buffer is full.
271    #[inline]
272    pub fn is_full(&self) -> bool {
273        self.ring_buffer.is_full()
274    }
275
276    /// Get the fill ratio (0.0 - 1.0).
277    #[inline]
278    pub fn fill_ratio(&self) -> f64 {
279        if self.capacity == 0 {
280            0.0
281        } else {
282            self.ring_buffer.len() as f64 / self.capacity as f64
283        }
284    }
285
286    /// Get the ring buffer capacity.
287    #[inline]
288    pub fn capacity(&self) -> usize {
289        self.capacity
290    }
291
292    /// Get a snapshot of shard statistics.
293    pub fn stats(&self) -> ShardStats {
294        self.counters.snapshot()
295    }
296
297    /// Record a batch dispatch.
298    pub fn record_batch_dispatch(&self) {
299        self.counters
300            .batches_dispatched
301            .fetch_add(1, AtomicOrdering::Relaxed);
302    }
303}
304
305/// Immutable routing table: shards + index + counter handles.
306///
307/// Placed behind an `ArcSwap` on `ShardManager` so the common read
308/// path (`ingest`, `ingest_raw`, `with_shard`, `stats`) is
309/// lock-free. Rebuilt on scale up/down via RCU-style swap.
310pub struct ShardTable {
311    /// All shards, indexed by position. `Arc<Mutex<Shard>>` lets a new
312    /// table share shard handles with the previous table (cheap Arc
313    /// clones during rebuild).
314    shards: Vec<Arc<parking_lot::Mutex<Shard>>>,
315    /// Parallel vector of counter handles. Exposes stats without
316    /// locking the shard mutex.
317    counters: Vec<Arc<ShardCounters>>,
318    /// Map from shard ID to index in `shards`/`counters`.
319    shard_index: std::collections::HashMap<u16, usize>,
320}
321
322impl ShardTable {
323    fn new(shards: Vec<Shard>) -> Self {
324        let mut shard_index = std::collections::HashMap::with_capacity(shards.len());
325        let mut counters = Vec::with_capacity(shards.len());
326        let shards: Vec<_> = shards
327            .into_iter()
328            .enumerate()
329            .map(|(idx, s)| {
330                shard_index.insert(s.id, idx);
331                counters.push(s.counters());
332                Arc::new(parking_lot::Mutex::new(s))
333            })
334            .collect();
335        Self {
336            shards,
337            counters,
338            shard_index,
339        }
340    }
341}
342
343/// Manager for multiple shards.
344///
345/// The ShardManager can operate in two modes:
346/// 1. Static mode (default): Fixed number of shards, simple hash-based routing
347/// 2. Dynamic mode: Shards can be added/removed based on load, weighted routing
348pub struct ShardManager {
349    /// Routing table. Swapped atomically on scale up/down so readers
350    /// never see a partially-updated `(shards, shard_index)` pair.
351    table: arc_swap::ArcSwap<ShardTable>,
352    /// Current number of active shards.
353    num_shards: std::sync::atomic::AtomicU16,
354    /// Backpressure mode.
355    backpressure_mode: BackpressureMode,
356    /// Ring buffer capacity for new shards.
357    ring_buffer_capacity: usize,
358    /// Optional shard mapper for dynamic scaling.
359    mapper: Option<Arc<ShardMapper>>,
360    /// Serializes concurrent `add_shard` / `remove_shard` rebuilds.
361    /// Not on the ingest path.
362    rebuild_lock: parking_lot::Mutex<()>,
363    /// Events dropped because no destination shard was resolvable.
364    /// Distinct from per-shard `events_dropped` (which tracks
365    /// backpressure on a known shard) — this counts events whose
366    /// hashed shard id was missing from the routing table at lookup
367    /// time, e.g. due to a concurrent scale-down. Surfaced via
368    /// `stats().events_unrouted`.
369    events_unrouted: AtomicU64,
370}
371
372impl ShardManager {
373    /// Create a new shard manager (static mode).
374    pub fn new(
375        num_shards: u16,
376        ring_buffer_capacity: usize,
377        backpressure_mode: BackpressureMode,
378    ) -> Self {
379        let shards: Vec<Shard> = (0..num_shards)
380            .map(|id| Shard::new(id, ring_buffer_capacity))
381            .collect();
382
383        Self {
384            table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
385            num_shards: std::sync::atomic::AtomicU16::new(num_shards),
386            backpressure_mode,
387            ring_buffer_capacity,
388            mapper: None,
389            rebuild_lock: parking_lot::Mutex::new(()),
390            events_unrouted: AtomicU64::new(0),
391        }
392    }
393
394    /// Create a new shard manager with dynamic scaling enabled.
395    pub fn with_mapper(
396        num_shards: u16,
397        ring_buffer_capacity: usize,
398        backpressure_mode: BackpressureMode,
399        policy: ScalingPolicy,
400    ) -> Result<Self, ScalingError> {
401        let mapper = Arc::new(ShardMapper::new(num_shards, ring_buffer_capacity, policy)?);
402
403        let shards: Vec<Shard> = (0..num_shards)
404            .map(|id| {
405                let metrics = mapper.metrics_collector(id).ok_or_else(|| {
406                    ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", id))
407                })?;
408                Ok(Shard::with_metrics(id, ring_buffer_capacity, metrics))
409            })
410            .collect::<Result<Vec<_>, ScalingError>>()?;
411
412        Ok(Self {
413            table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
414            num_shards: std::sync::atomic::AtomicU16::new(num_shards),
415            backpressure_mode,
416            ring_buffer_capacity,
417            mapper: Some(mapper),
418            rebuild_lock: parking_lot::Mutex::new(()),
419            events_unrouted: AtomicU64::new(0),
420        })
421    }
422
423    /// Get the shard mapper (if dynamic scaling is enabled).
424    pub fn mapper(&self) -> Option<&Arc<ShardMapper>> {
425        self.mapper.as_ref()
426    }
427
428    /// Get the number of active shards.
429    #[inline]
430    pub fn num_shards(&self) -> u16 {
431        self.num_shards.load(std::sync::atomic::Ordering::Acquire)
432    }
433
434    /// Get the backpressure mode.
435    #[inline]
436    pub fn backpressure_mode(&self) -> BackpressureMode {
437        self.backpressure_mode
438    }
439
440    /// Select a shard for an event based on its content hash.
441    /// Uses weighted selection if dynamic scaling is enabled.
442    ///
443    /// **Prefer [`select_shard_by_hash`].** This method serializes the
444    /// `JsonValue` to bytes just to compute the hash; if you already
445    /// have a `RawEvent` (or any pre-computed `xxh3_64` of the
446    /// canonical bytes), pass that hash directly. The internal
447    /// ingest paths all do — this method exists for ad-hoc external
448    /// callers that haven't yet adopted the `RawEvent` pattern.
449    ///
450    /// [`select_shard_by_hash`]: Self::select_shard_by_hash
451    #[inline]
452    #[deprecated(
453        since = "0.10.0",
454        note = "serializes the value just to hash it; prefer `RawEvent::from_value(v).hash()` + `select_shard_by_hash` to avoid the duplicate serialization"
455    )]
456    #[expect(
457        clippy::expect_used,
458        reason = "serde_json::to_vec on a JsonValue (which round-tripped from JSON or was built via the type's own constructors) is infallible"
459    )]
460    pub fn select_shard(&self, event: &JsonValue) -> u16 {
461        // Use xxhash for fast, deterministic hashing. `to_vec` avoids the
462        // extra UTF-8 validation that `to_string` performs on the serialized
463        // buffer, since we only need the bytes for hashing.
464        let bytes = serde_json::to_vec(event).expect("Value serialization is infallible");
465        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
466        self.select_shard_by_hash(hash)
467    }
468
469    /// Select a shard using a pre-computed hash.
470    ///
471    /// This is faster than `select_shard` when you already have the hash.
472    #[inline]
473    pub fn select_shard_by_hash(&self, hash: u64) -> u16 {
474        if let Some(ref mapper) = self.mapper {
475            // Dynamic mode: use weighted selection
476            mapper.select_shard(hash)
477        } else {
478            // Static mode: Lemire's bias-free multiply-shift mapping.
479            // Pre-fix this ran `hash % num_shards` per event — modulo
480            // by a non-constant `u64` is a `div` on every modern uarch
481            // (~20-25 cycles) and dwarfs the upstream xxh3 hash cost
482            // for the static-mode hot path. The multiply-shift form
483            // `((hash * n) >> 64)` is ~3 cycles and is the same
484            // unbiased reduction already used in `mapper.rs:660`. Per
485            // net-perf #7.
486            //
487            // The defensive guard against `num_shards == 0` stays —
488            // config validation rejects 0 at startup and `scale_down`
489            // requires `current > min_shards >= 1` so this branch is
490            // unreachable today, but a stray 0 here would otherwise
491            // make the multiply land at index 0 silently while the
492            // legacy modulo would have panicked. Returning 0
493            // explicitly preserves the legacy "any select returns
494            // shard 0" failure mode without the panic.
495            let num_shards = self.num_shards.load(std::sync::atomic::Ordering::Acquire);
496            debug_assert!(num_shards > 0, "num_shards must be > 0");
497            if num_shards == 0 {
498                return 0;
499            }
500            ((hash as u128 * num_shards as u128) >> 64) as u16
501        }
502    }
503
504    /// Resolve a shard ID to its table index, using the fast path in
505    /// static mode (shard_id == index).
506    #[inline]
507    fn resolve_idx(&self, table: &ShardTable, shard_id: u16) -> Option<usize> {
508        if self.mapper.is_none() {
509            Some(shard_id as usize)
510        } else {
511            table.shard_index.get(&shard_id).copied()
512        }
513    }
514
515    /// Push `raw` into `shard`, handling backpressure. Only clones the
516    /// bytes when `DropOldest` needs them for the retry path.
517    #[inline]
518    fn push_with_backpressure(
519        &self,
520        shard: &mut Shard,
521        shard_id: u16,
522        raw: Bytes,
523    ) -> Result<(u16, u64), IngestionError> {
524        match self.backpressure_mode {
525            BackpressureMode::DropOldest => match shard.try_push_raw(raw.clone()) {
526                Ok(ts) => Ok((shard_id, ts)),
527                Err(IngestionError::Backpressure) => {
528                    // The failed try_push_raw incremented events_dropped for
529                    // the *new* event, but the new event isn't actually
530                    // dropped — the oldest is. Correct the stats: undo the
531                    // spurious drop count, evict the oldest (which is the real
532                    // drop), and retry with the same ref-counted bytes.
533                    //
534                    // Use the producer-side `evict_oldest` rather
535                    // than `try_pop`. Calling `try_pop` from the
536                    // producer thread would violate the SPSC consumer
537                    // contract (the
538                    // legitimate consumer is the batch worker, on a
539                    // different task / thread).
540                    //
541                    // Transient stats note: a concurrent reader of
542                    // `manager.stats().events_dropped` between the
543                    // `fetch_sub` and the second `fetch_add` would
544                    // briefly observe the pre-correction value
545                    // (one less than reality). The net delta over
546                    // the whole retry is `+1`, matching the real
547                    // drop. Documented as snapshot-not-coherent
548                    // per `ShardCounters::snapshot`'s contract.
549                    shard
550                        .counters
551                        .events_dropped
552                        .fetch_sub(1, AtomicOrdering::Relaxed);
553                    let _ = shard.evict_oldest();
554                    shard
555                        .counters
556                        .events_dropped
557                        .fetch_add(1, AtomicOrdering::Relaxed);
558                    shard.try_push_raw(raw).map(|ts| (shard_id, ts))
559                }
560                Err(e) => Err(e),
561            },
562            BackpressureMode::Sample { .. } => match shard.try_push_raw(raw) {
563                Ok(ts) => Ok((shard_id, ts)),
564                Err(IngestionError::Backpressure) => Err(IngestionError::Sampled),
565                Err(e) => Err(e),
566            },
567            BackpressureMode::DropNewest | BackpressureMode::FailProducer => {
568                shard.try_push_raw(raw).map(|ts| (shard_id, ts))
569            }
570        }
571    }
572
573    /// Ingest an event into the appropriate shard.
574    pub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError> {
575        // Serialize once upfront - avoids clone on retry
576        let raw = Bytes::from(serde_json::to_vec(&event)?);
577        let hash = xxhash_rust::xxh3::xxh3_64(&raw);
578        let shard_id = self.select_shard_by_hash(hash);
579
580        let table = self.table.load();
581        // Surface "no routable destination" as `Unrouted` (not
582        // `Backpressure`) and bump the manager-level
583        // `events_unrouted` counter so per-event vs. batch-path
584        // accounting agree. The secondary `table.shards.get(idx)`
585        // miss should be impossible by the `shard_index ↔ shards`
586        // invariant — keep returning `Unrouted` defensively rather
587        // than panicking.
588        let Some(idx) = self.resolve_idx(&table, shard_id) else {
589            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
590            return Err(IngestionError::Unrouted);
591        };
592        let Some(shard_lock) = table.shards.get(idx) else {
593            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
594            return Err(IngestionError::Unrouted);
595        };
596
597        let mut shard = shard_lock.lock();
598        self.push_with_backpressure(&mut shard, shard_id, raw)
599    }
600
601    /// Ingest a raw event (pre-serialized with cached hash).
602    ///
603    /// This is the fastest ingestion path:
604    /// - Uses pre-computed hash for shard selection (no serialization)
605    /// - Stores bytes directly (no clone needed, reference-counted)
606    #[inline]
607    pub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError> {
608        let shard_id = self.select_shard_by_hash(event.hash());
609
610        let table = self.table.load();
611        // See `ingest` above for the `Unrouted` rationale.
612        let Some(idx) = self.resolve_idx(&table, shard_id) else {
613            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
614            return Err(IngestionError::Unrouted);
615        };
616        let Some(shard_lock) = table.shards.get(idx) else {
617            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
618            return Err(IngestionError::Unrouted);
619        };
620
621        let mut shard = shard_lock.lock();
622        self.push_with_backpressure(&mut shard, shard_id, event.bytes())
623    }
624
625    /// Ingest a batch of pre-serialized events, grouped by shard.
626    ///
627    /// Each destination shard's mutex is acquired once and all of that
628    /// shard's events are pushed before releasing. With a uniform hash
629    /// distribution this amortizes lock acquisitions from O(events) to
630    /// O(shards). Backpressure semantics match per-event `ingest_raw`.
631    ///
632    /// Returns `(success, unrouted)` where `success` is the count of
633    /// events successfully pushed onto a shard's ring buffer and
634    /// `unrouted` is the count of events whose destination shard was
635    /// not present in the routing table at the time of dispatch
636    /// (e.g. concurrent scale-down). The remainder
637    /// (`total - success - unrouted`) is the backpressure-class drop
638    /// count.
639    ///
640    /// Returns `(success, unrouted)` rather than just `success`
641    /// so the bus can subtract `unrouted` before publishing
642    /// `events_dropped`. Returning only `success` would let the
643    /// bus's `dropped = total - success` accounting double-count
644    /// unrouted events — they're already tallied on
645    /// `events_unrouted` inside this function.
646    pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize) {
647        if events.is_empty() {
648            return (0, 0);
649        }
650
651        let table = self.table.load();
652
653        // Bucket by table index. Using a Vec<Vec<_>> keyed by index is
654        // cheaper than a HashMap for the common case of a small
655        // shard count.
656        let mut groups: Vec<Vec<Bytes>> = (0..table.shards.len()).map(|_| Vec::new()).collect();
657        let mut group_ids: Vec<u16> = vec![0; groups.len()];
658
659        let mut unrouted = 0usize;
660        for event in events {
661            let shard_id = self.select_shard_by_hash(event.hash());
662            let Some(idx) = self.resolve_idx(&table, shard_id) else {
663                // Routing table doesn't contain the chosen shard
664                // (e.g. concurrent scale-down removed it). The drop
665                // can't be attributed to a per-shard counter; track
666                // it on the manager-level `events_unrouted` so
667                // bus-level vs. per-shard reconciliation is exact.
668                unrouted += 1;
669                continue;
670            };
671            if let Some(g) = groups.get_mut(idx) {
672                if g.is_empty() {
673                    group_ids[idx] = shard_id;
674                }
675                g.push(event.bytes());
676            }
677        }
678        if unrouted > 0 {
679            self.events_unrouted
680                .fetch_add(unrouted as u64, AtomicOrdering::Relaxed);
681        }
682
683        let mut success = 0usize;
684        for (idx, group) in groups.into_iter().enumerate() {
685            if group.is_empty() {
686                continue;
687            }
688            let shard_id = group_ids[idx];
689            let Some(shard_lock) = table.shards.get(idx) else {
690                continue;
691            };
692            let mut shard = shard_lock.lock();
693            for bytes in group {
694                if self
695                    .push_with_backpressure(&mut shard, shard_id, bytes)
696                    .is_ok()
697                {
698                    success += 1;
699                }
700            }
701        }
702
703        (success, unrouted)
704    }
705
706    /// Get a reference to a shard by ID.
707    pub fn shard(&self, id: u16) -> Option<ShardRef> {
708        let table = self.table.load();
709        let idx = self.resolve_idx(&table, id)?;
710        let shard = table.shards.get(idx)?.clone();
711        Some(ShardRef { shard })
712    }
713
714    /// Execute a function with exclusive access to a shard.
715    pub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
716    where
717        F: FnOnce(&mut Shard) -> R,
718    {
719        let table = self.table.load();
720        let idx = self.resolve_idx(&table, id)?;
721        table.shards.get(idx).map(|shard_lock| {
722            let mut shard = shard_lock.lock();
723            f(&mut shard)
724        })
725    }
726
727    /// Returns true if every shard's ring buffer is empty.
728    ///
729    /// Cheaper than `shard_ids()` + repeated `with_shard`: loads the
730    /// routing table once and checks each shard behind a brief lock.
731    pub fn all_shards_empty(&self) -> bool {
732        let table = self.table.load();
733        table.shards.iter().all(|s| s.lock().is_empty())
734    }
735
736    /// Iterate over all active shard IDs.
737    pub fn shard_ids(&self) -> Vec<u16> {
738        self.table.load().shard_index.keys().copied().collect()
739    }
740
741    /// Sum of `len()` across every shard's ring buffer.
742    pub fn total_pending_in_rings(&self) -> u64 {
743        let table = self.table.load();
744        table.shards.iter().map(|s| s.lock().len() as u64).sum()
745    }
746
747    /// Best-effort variant of [`Self::total_pending_in_rings`] that
748    /// never blocks: every shard whose mutex is currently held is
749    /// skipped (counted as zero). Use this from `Drop` or any path
750    /// that may run on a thread already holding a shard lock
751    /// (single-thread runtime + panic during shutdown is the
752    /// canonical hazard); the blocking variant would self-deadlock
753    /// there.
754    ///
755    /// Returns `(sum_counted, uncounted_shard_count)` so the caller
756    /// can log the uncertainty in the result.
757    pub fn try_total_pending_in_rings(&self) -> (u64, usize) {
758        let table = self.table.load();
759        let mut sum: u64 = 0;
760        let mut uncounted: usize = 0;
761        for s in table.shards.iter() {
762            match s.try_lock() {
763                Some(guard) => sum += guard.len() as u64,
764                None => uncounted += 1,
765            }
766        }
767        (sum, uncounted)
768    }
769
770    /// Get aggregated statistics from all shards.
771    ///
772    /// Lock-free: reads each shard's atomic counters directly via the
773    /// parallel `counters` vector on the routing table, with no per-
774    /// shard mutex acquisition. `events_unrouted` is sourced from the
775    /// `ShardManager` itself rather than the per-shard counters since
776    /// unrouted events have no shard to attribute to.
777    pub fn stats(&self) -> ShardStats {
778        let table = self.table.load();
779        let mut total = ShardStats::default();
780        for counters in table.counters.iter() {
781            let snap = counters.snapshot();
782            total.events_ingested += snap.events_ingested;
783            total.events_dropped += snap.events_dropped;
784            total.batches_dispatched += snap.batches_dispatched;
785        }
786        total.events_unrouted = self.events_unrouted.load(AtomicOrdering::Relaxed);
787        total
788    }
789
790    /// Rebuild the routing table with a closure that sees the old
791    /// `(shards, counters, shard_index)` and produces the new ones.
792    /// Serialized by `rebuild_lock` so concurrent scaling operations
793    /// can't race on read-modify-write of the table.
794    fn rebuild_table<F>(&self, f: F)
795    where
796        F: FnOnce(
797            &Vec<Arc<parking_lot::Mutex<Shard>>>,
798            &Vec<Arc<ShardCounters>>,
799            &std::collections::HashMap<u16, usize>,
800        ) -> ShardTable,
801    {
802        let _guard = self.rebuild_lock.lock();
803        let old = self.table.load();
804        let new = f(&old.shards, &old.counters, &old.shard_index);
805        self.table.store(Arc::new(new));
806    }
807
808    /// Add a new shard (for dynamic scaling).
809    /// Returns the new shard ID. The shard is in the routing table
810    /// and ready to be the destination of `select_shard` calls
811    /// **only after** [`activate_shard`] is called for it.
812    ///
813    /// Previously the mapper marked the shard `Active` *before* the
814    /// routing table was rebuilt and *before* any worker was wired up
815    /// to drain its ring buffer. Producers could `select_shard` to
816    /// the new id, push into its ring buffer, and have the events
817    /// stranded with no consumer. The fix uses
818    /// `scale_up_provisioning` so the mapper records the shard but
819    /// `select_shard` skips it, then `activate_shard` flips it to
820    /// `Active` once workers are ready.
821    ///
822    /// [`activate_shard`]: Self::activate_shard
823    pub fn add_shard(&self) -> Result<u16, ScalingError> {
824        self.add_shard_inner(false)
825    }
826
827    /// Like [`add_shard`] but bypasses the auto-scaling cooldown.
828    ///
829    /// Used by operator-initiated `manual_scale_up` paths. The
830    /// auto-scaling cooldown protects against the auto-scaling
831    /// monitor reacting too quickly to transient load spikes;
832    /// a deliberate operator action should not be rate-limited
833    /// by that cadence. The `max_shards` budget check still
834    /// applies.
835    ///
836    /// [`add_shard`]: Self::add_shard
837    pub fn add_shard_force(&self) -> Result<u16, ScalingError> {
838        self.add_shard_inner(true)
839    }
840
841    fn add_shard_inner(&self, force: bool) -> Result<u16, ScalingError> {
842        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
843            "Dynamic scaling not enabled".into(),
844        ))?;
845
846        // Allocate the shard in `Provisioning` state — not yet
847        // selectable.
848        let new_ids = if force {
849            mapper.scale_up_provisioning_force(1)?
850        } else {
851            mapper.scale_up_provisioning(1)?
852        };
853        let new_id = new_ids[0];
854
855        let metrics = mapper.metrics_collector(new_id).ok_or_else(|| {
856            ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", new_id))
857        })?;
858        let new_shard = Shard::with_metrics(new_id, self.ring_buffer_capacity, metrics);
859        let new_counters = new_shard.counters();
860        let new_shard = Arc::new(parking_lot::Mutex::new(new_shard));
861
862        // Publish to the routing table so `with_shard` works (the
863        // drain worker the caller is about to spawn needs this) but
864        // the shard is still `Provisioning` so `select_shard` will
865        // not route producer pushes to it yet.
866        self.rebuild_table(|shards, counters, shard_index| {
867            let mut shards = shards.clone();
868            let mut counters = counters.clone();
869            let mut shard_index = shard_index.clone();
870            let idx = shards.len();
871            shards.push(new_shard.clone());
872            counters.push(new_counters.clone());
873            shard_index.insert(new_id, idx);
874            ShardTable {
875                shards,
876                counters,
877                shard_index,
878            }
879        });
880
881        // Don't bump `num_shards` yet — `activate_shard` does that
882        // when the shard becomes selectable.
883        Ok(new_id)
884    }
885
886    /// Activate a previously-provisioned shard. After this returns,
887    /// `select_shard` will route to the shard and producer pushes
888    /// will land in its ring buffer.
889    ///
890    /// Idempotent: calling on an already-`Active` shard is `Ok(())`.
891    ///
892    /// Pre-fix this unconditionally `fetch_add(1)`d
893    /// `num_shards` even when the mapper's `activate()` early-
894    /// returned for an already-`Active` shard. After repeated
895    /// activate calls, `num_shards` exceeded both the mapper's
896    /// `active_count` and the actual shard count, breaking
897    /// modulo-based shard selection (`select_shard`) and
898    /// producing stale routing decisions.  Post-fix gates the
899    /// `fetch_add` on the mapper's transition signal.
900    pub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
901        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
902            "Dynamic scaling not enabled".into(),
903        ))?;
904        let transitioned = mapper.activate(shard_id)?;
905        if transitioned {
906            self.num_shards
907                .fetch_add(1, std::sync::atomic::Ordering::Release);
908        }
909        Ok(())
910    }
911
912    /// Start draining a shard (for dynamic scaling).
913    ///
914    /// Previously only flipped the metrics collector's `draining`
915    /// atomic, leaving `MappedShard.state` untouched. Result:
916    /// `select_shard` (which filters on `state == Active`) still
917    /// routed new producers to the shard. The fix calls into the
918    /// mapper, which atomically transitions the state to `Draining`
919    /// and (for accounting) decrements `active_count`, mirroring
920    /// `scale_down(N)` for a single targeted shard.
921    pub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
922        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
923            "Dynamic scaling not enabled".into(),
924        ))?;
925        mapper.drain_specific(shard_id)
926    }
927
928    /// Remove a shard from the routing table.
929    ///
930    /// Previously this only unmapped the shard from the routing
931    /// table. The drain worker, on its next `with_shard` call,
932    /// observed `None` and exited — leaving any events still in the
933    /// ring buffer permanently stranded. The fix drains the ring
934    /// buffer into a caller-supplied scratch `Vec` **before** the
935    /// unmap, then returns the drained events so the caller
936    /// (typically `EventBus::remove_shard_internal`) can flush them
937    /// through to the adapter rather than dropping them.
938    ///
939    /// Returns `Ok(events)` where `events` is whatever was still
940    /// queued in the ring buffer at unmap time (possibly empty).
941    /// Caller is responsible for handing those off to the adapter.
942    pub fn remove_shard(
943        &self,
944        shard_id: u16,
945    ) -> Result<Vec<crate::event::InternalEvent>, ScalingError> {
946        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
947            "Dynamic scaling not enabled".into(),
948        ))?;
949
950        // Capture the mapper-side state *before* we unmap. This
951        // gates the `num_shards` decrement at the end so it stays
952        // symmetric with `activate_shard`'s `fetch_add`. The
953        // activate-failure rollback path (`bus.rs`) calls us on a
954        // shard that's still `Provisioning` — `add_shard` never
955        // bumped `num_shards` for it, so an unconditional
956        // `fetch_sub` here would leave the counter one below the
957        // table's actual size, breaking modulo-based shard
958        // selection. `Active` / `Draining` / `Stopped` shards all
959        // had `activate_shard` succeed against them at some point
960        // (it's the only way out of `Provisioning`), so they did
961        // bump `num_shards` and must decrement here.
962        let was_activated = matches!(
963            mapper.shard_state(shard_id),
964            Some(ShardState::Active) | Some(ShardState::Draining) | Some(ShardState::Stopped)
965        );
966
967        // Drain whatever is left in the ring buffer before unmapping.
968        // `with_shard` returns `None` once the shard is gone, so we
969        // do this *before* `rebuild_table`. We cap drain to a sane
970        // upper bound (`ring_buffer_capacity`) so a malformed shard
971        // can't pin us here forever.
972        let cap = self.ring_buffer_capacity;
973        let drained: Vec<crate::event::InternalEvent> = self
974            .with_shard(shard_id, |shard| {
975                let mut buf = Vec::with_capacity(shard.len().min(cap));
976                shard.pop_batch_into(&mut buf, cap);
977                buf
978            })
979            .unwrap_or_default();
980
981        let mut removed = false;
982        self.rebuild_table(|shards, counters, shard_index| {
983            let mut shards = shards.clone();
984            let mut counters = counters.clone();
985            let mut shard_index = shard_index.clone();
986
987            if let Some(idx) = shard_index.remove(&shard_id) {
988                removed = true;
989                shards.swap_remove(idx);
990                counters.swap_remove(idx);
991                // swap_remove moved the last element into `idx`: update its
992                // index mapping.
993                if idx < shards.len() {
994                    let moved_shard_id = shards[idx].lock().id;
995                    shard_index.insert(moved_shard_id, idx);
996                }
997            }
998
999            ShardTable {
1000                shards,
1001                counters,
1002                shard_index,
1003            }
1004        });
1005
1006        if removed && was_activated {
1007            self.num_shards
1008                .fetch_sub(1, std::sync::atomic::Ordering::Release);
1009        }
1010
1011        // Ask the mapper to drop the corresponding `MappedShard`
1012        // record. Without this sweep the mapper's
1013        // `shards: RwLock<Vec<MappedShard>>` would keep growing
1014        // across scale-up/down cycles (every scale-up appends a
1015        // fresh entry; `Stopped` entries are only removed by an
1016        // explicit `remove_specific_stopped_shard` /
1017        // `remove_stopped_shards` call). `evaluate_scaling`
1018        // filters by state but still iterates the full list, so
1019        // per-tick cost would grow with cumulative scaling history.
1020        //
1021        // The scaling monitor calls `mapper.finalize_draining()`
1022        // before invoking `bus.remove_shard_internal(id)` (which is
1023        // what calls us), so by the time we run the matching
1024        // `MappedShard` is already in `Stopped` state. We prune
1025        // ONLY this shard here, not every Stopped one — a bulk
1026        // sweep would prune sibling Stopped shards that a
1027        // sequential `manual_scale_down` is about to look up
1028        // state for in its next iteration's `remove_shard`. Once
1029        // the mapper had `None` for a sibling shard, the
1030        // `was_activated` gate above would observe it as
1031        // never-activated and skip the `num_shards` decrement,
1032        // leaving the counter one below the actual table size.
1033        mapper.remove_specific_stopped_shard(shard_id);
1034
1035        Ok(drained)
1036    }
1037
1038    /// Collect metrics from all shards (for dynamic scaling decisions).
1039    pub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>> {
1040        self.mapper.as_ref().map(|m| m.collect_metrics())
1041    }
1042
1043    /// Evaluate and optionally execute scaling.
1044    pub fn evaluate_scaling(&self) -> ScalingDecision {
1045        self.mapper
1046            .as_ref()
1047            .map(|m| m.evaluate_scaling())
1048            .unwrap_or(ScalingDecision::None)
1049    }
1050}
1051
1052/// An owned handle to a shard. Holding this does not block scaling
1053/// operations; the shard stays alive via `Arc` refcount even if
1054/// removed from the table.
1055pub struct ShardRef {
1056    shard: Arc<parking_lot::Mutex<Shard>>,
1057}
1058
1059impl ShardRef {
1060    /// Lock the shard for exclusive access.
1061    pub fn lock(&self) -> parking_lot::MutexGuard<'_, Shard> {
1062        self.shard.lock()
1063    }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069    use serde_json::json;
1070
1071    #[test]
1072    fn test_shard_push_pop() {
1073        let mut shard = Shard::new(0, 1024);
1074
1075        let ts = shard.try_push(json!({"test": 1})).unwrap();
1076        assert!(ts > 0);
1077        assert_eq!(shard.len(), 1);
1078
1079        let event = shard.try_pop().unwrap();
1080        assert_eq!(event.shard_id, 0);
1081        assert_eq!(event.insertion_ts, ts);
1082        assert!(shard.is_empty());
1083    }
1084
1085    /// A `Shard` configured with a `ShardMetricsCollector` must feed every
1086    /// successful push into the collector so the dynamic-scaling and
1087    /// drain-finalize paths see non-zero counters. Without this wiring
1088    /// `evaluate_scaling` reads `fill_ratio == 0` for every shard and
1089    /// `finalize_draining`'s "is the ring actually empty" predicate is a
1090    /// no-op (it sees `pushes_since_drain_start == 0` regardless of
1091    /// contents).
1092    #[test]
1093    fn try_push_feeds_metrics_collector() {
1094        let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1095        let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1096
1097        for i in 0..16 {
1098            shard.try_push(json!({"i": i})).unwrap();
1099        }
1100
1101        let metrics = collector.collect_and_reset();
1102        assert_eq!(
1103            metrics.event_rate, 16,
1104            "every push must increment event_rate"
1105        );
1106        assert!(metrics.fill_ratio > 0.0, "buffer length must be observable");
1107        assert!(
1108            metrics.avg_push_latency_ns > 0,
1109            "push latency must be recorded"
1110        );
1111    }
1112
1113    /// `try_total_pending_in_rings` must never block, must skip
1114    /// shards whose mutex is currently held, and must report how
1115    /// many it skipped. This is what makes `EventBus::Drop`
1116    /// safe to call on a thread that already holds a shard lock.
1117    #[test]
1118    fn try_total_pending_in_rings_skips_held_shards() {
1119        let manager = ShardManager::new(2, 1024, BackpressureMode::DropNewest);
1120        // Push some events so a non-zero count is observable.
1121        manager.ingest(json!({"i": 1})).unwrap();
1122        manager.ingest(json!({"i": 2})).unwrap();
1123        manager.ingest(json!({"i": 3})).unwrap();
1124
1125        // Uncontended: all shards counted, uncounted_shards == 0.
1126        let (sum, uncounted) = manager.try_total_pending_in_rings();
1127        assert_eq!(uncounted, 0);
1128        let baseline_sum = sum;
1129        assert!(baseline_sum > 0, "events should be pending in some shard");
1130
1131        // Hold one shard's mutex and re-check: that shard must be
1132        // skipped, uncounted must be 1, and the call must return
1133        // immediately (this test would hang on the blocking
1134        // `total_pending_in_rings` variant).
1135        let table = manager.table.load();
1136        let _guard = table.shards[0].lock();
1137        let (sum2, uncounted2) = manager.try_total_pending_in_rings();
1138        assert_eq!(uncounted2, 1, "the locked shard must be uncounted");
1139        assert!(
1140            sum2 <= baseline_sum,
1141            "sum must not include events from the locked shard"
1142        );
1143    }
1144
1145    /// Same wiring for `try_push_raw` — the byte-oriented hot path.
1146    #[test]
1147    fn try_push_raw_feeds_metrics_collector() {
1148        let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1149        let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1150
1151        for i in 0..16 {
1152            shard
1153                .try_push_raw(bytes::Bytes::from(format!("event-{i}")))
1154                .unwrap();
1155        }
1156
1157        let metrics = collector.collect_and_reset();
1158        assert_eq!(metrics.event_rate, 16);
1159        assert!(metrics.fill_ratio > 0.0);
1160        assert!(metrics.avg_push_latency_ns > 0);
1161    }
1162
1163    #[test]
1164    #[allow(deprecated)] // exercises the deprecated `select_shard` path
1165    fn test_shard_manager_routing() {
1166        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1167
1168        // Same event should always go to the same shard
1169        let event = json!({"key": "value"});
1170        let shard1 = manager.select_shard(&event);
1171        let shard2 = manager.select_shard(&event);
1172        assert_eq!(shard1, shard2);
1173
1174        // Different events may go to different shards
1175        let events: Vec<_> = (0..100).map(|i| json!({"i": i})).collect();
1176        let shards: std::collections::HashSet<_> =
1177            events.iter().map(|e| manager.select_shard(e)).collect();
1178
1179        // With 100 random events and 4 shards, we should hit multiple shards
1180        assert!(shards.len() > 1);
1181    }
1182
1183    /// Regression: the deprecated `select_shard(&JsonValue)` must produce
1184    /// the same shard id as `select_shard_by_hash` would for the
1185    /// equivalent `RawEvent`. They share underlying logic now, but if a
1186    /// future refactor splits them this test catches the divergence
1187    /// before consumers do.
1188    #[test]
1189    #[allow(deprecated)]
1190    fn test_select_shard_matches_select_shard_by_hash() {
1191        let manager = ShardManager::new(8, 1024, BackpressureMode::DropNewest);
1192        for i in 0..200 {
1193            let v = json!({"i": i, "tag": format!("user-{i}")});
1194            let raw = RawEvent::from_value(v.clone());
1195            assert_eq!(
1196                manager.select_shard(&v),
1197                manager.select_shard_by_hash(raw.hash()),
1198                "select_shard and select_shard_by_hash must agree (i={i})"
1199            );
1200        }
1201    }
1202
1203    /// Pin net-perf #7: the static-mode `select_shard_by_hash` is now
1204    /// Lemire's `(hash * n) >> 64` instead of `hash % n` (one `div`
1205    /// per event eliminated). Lemire maps `[0, u64::MAX]` evenly into
1206    /// `[0, n)` for any `n` that fits in u16: the output must always
1207    /// land in range, and `hash == 0` must still resolve to shard 0
1208    /// (the multiplication overflow-pattern boundary).
1209    #[test]
1210    fn select_shard_by_hash_uses_lemire_reduction_in_static_mode() {
1211        for &shard_count in &[1u16, 2, 3, 4, 7, 8, 16, 64] {
1212            let manager = ShardManager::new(shard_count, 1024, BackpressureMode::DropNewest);
1213
1214            // `hash == 0` → `(0 * n) >> 64 == 0` regardless of `n`.
1215            // Locking down this boundary catches a regression where
1216            // the multiplication is dropped or reordered.
1217            assert_eq!(
1218                manager.select_shard_by_hash(0),
1219                0,
1220                "hash 0 must resolve to shard 0 (n={shard_count})"
1221            );
1222
1223            // Every output must be a valid shard index. A regression
1224            // back to `hash % shard_count` would still land in range,
1225            // but any wrong shift (e.g. `>> 32`) or sign-extension
1226            // bug would push the result past `shard_count`.
1227            for hash in [
1228                1u64,
1229                42,
1230                u64::MAX,
1231                u64::MAX - 1,
1232                0x8000_0000_0000_0000,
1233                0x7FFF_FFFF_FFFF_FFFF,
1234                0xDEAD_BEEF_DEAD_BEEF,
1235            ] {
1236                let shard = manager.select_shard_by_hash(hash);
1237                assert!(
1238                    shard < shard_count,
1239                    "shard {shard} out of range for n={shard_count}, hash={hash:#x}"
1240                );
1241            }
1242        }
1243
1244        // Distribution sanity: across 10_000 successive hashes that
1245        // mimic the input spread of `xxh3_64`, every shard sees at
1246        // least one event. A regression to `>> 64` returning 0 always
1247        // (e.g. `(hash as u64 * n as u64) >> 64`, which truncates the
1248        // product) would put everything on shard 0.
1249        let manager = ShardManager::new(8, 1024, BackpressureMode::DropNewest);
1250        let mut counts = [0usize; 8];
1251        for i in 0u64..10_000 {
1252            let h = xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes());
1253            counts[manager.select_shard_by_hash(h) as usize] += 1;
1254        }
1255        for (i, &c) in counts.iter().enumerate() {
1256            assert!(
1257                c > 0,
1258                "shard {i} got 0 events out of 10_000 (Lemire reduction must spread)"
1259            );
1260        }
1261    }
1262
1263    #[test]
1264    fn test_shard_manager_ingest() {
1265        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1266
1267        for i in 0..100 {
1268            let event = json!({"i": i});
1269            let result = manager.ingest(event);
1270            assert!(result.is_ok());
1271        }
1272
1273        let stats = manager.stats();
1274        assert_eq!(stats.events_ingested, 100);
1275        assert_eq!(stats.events_dropped, 0);
1276    }
1277
1278    #[test]
1279    fn test_backpressure_drop_newest() {
1280        let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1281
1282        // Fill the buffer (capacity 4, usable 3)
1283        for i in 0..3 {
1284            manager.ingest(json!({"i": i})).unwrap();
1285        }
1286
1287        // Next insert should fail
1288        let result = manager.ingest(json!({"i": 999}));
1289        assert!(matches!(result, Err(IngestionError::Backpressure)));
1290
1291        let stats = manager.stats();
1292        assert_eq!(stats.events_ingested, 3);
1293        assert_eq!(stats.events_dropped, 1);
1294    }
1295
1296    #[test]
1297    fn test_backpressure_drop_oldest() {
1298        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1299
1300        // Fill the buffer
1301        for i in 0..3 {
1302            manager.ingest(json!({"i": i})).unwrap();
1303        }
1304
1305        // Next insert should succeed by dropping oldest
1306        let result = manager.ingest(json!({"i": 999}));
1307        assert!(result.is_ok());
1308
1309        // Verify the oldest was dropped
1310        let shard = manager.shard(0).unwrap();
1311        let events = shard.lock().pop_batch(10);
1312
1313        // Should have events 1, 2, 999 (0 was dropped)
1314        assert_eq!(events.len(), 3);
1315        assert_eq!(events[0].parse().unwrap(), json!({"i": 1}));
1316        assert_eq!(events[2].parse().unwrap(), json!({"i": 999}));
1317    }
1318
1319    #[test]
1320    fn test_raw_event_ingestion() {
1321        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1322
1323        for i in 0..100 {
1324            let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1325            let result = manager.ingest_raw(raw);
1326            assert!(result.is_ok());
1327        }
1328
1329        let stats = manager.stats();
1330        assert_eq!(stats.events_ingested, 100);
1331        assert_eq!(stats.events_dropped, 0);
1332    }
1333
1334    /// `ingest_raw_batch` groups events by destination shard before
1335    /// pushing — verify the grouping preserves FIFO within a shard,
1336    /// honors hash-based routing, and that totals match `ingest_raw`.
1337    #[test]
1338    fn test_ingest_raw_batch_routes_and_preserves_order() {
1339        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1340        let events: Vec<RawEvent> = (0..200)
1341            .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1342            .collect();
1343
1344        // Snapshot the expected destination for each event so we can
1345        // compare against what actually landed in each shard.
1346        let expected_dests: Vec<u16> = events
1347            .iter()
1348            .map(|e| manager.select_shard_by_hash(e.hash()))
1349            .collect();
1350
1351        let (success, unrouted) = manager.ingest_raw_batch(events.clone());
1352        assert_eq!(success, 200, "all events should land with ample capacity");
1353        assert_eq!(unrouted, 0, "no scale-down so no unrouted events");
1354
1355        // Aggregate totals must match.
1356        let stats = manager.stats();
1357        assert_eq!(stats.events_ingested, 200);
1358        assert_eq!(stats.events_dropped, 0);
1359
1360        // Per-shard totals must match the expected routing distribution,
1361        // and the distribution must span more than one shard (otherwise
1362        // the test wouldn't exercise the grouping path).
1363        let mut expected_by_shard: std::collections::HashMap<u16, u64> =
1364            std::collections::HashMap::new();
1365        for d in &expected_dests {
1366            *expected_by_shard.entry(*d).or_default() += 1;
1367        }
1368        assert!(
1369            expected_by_shard.len() > 1,
1370            "hash distribution should span multiple shards"
1371        );
1372        for shard_id in 0..4u16 {
1373            let got = manager
1374                .with_shard(shard_id, |s| s.stats().events_ingested)
1375                .unwrap();
1376            let want = expected_by_shard.get(&shard_id).copied().unwrap_or(0);
1377            assert_eq!(got, want, "shard {} ingested count mismatch", shard_id);
1378        }
1379
1380        // FIFO within a shard: the events a shard received, in the order
1381        // we batched them, must come out of the ring buffer in the same
1382        // order.
1383        for shard_id in 0..4u16 {
1384            let expected_payloads: Vec<&[u8]> = events
1385                .iter()
1386                .zip(expected_dests.iter())
1387                .filter(|(_, d)| **d == shard_id)
1388                .map(|(e, _)| e.as_bytes())
1389                .collect();
1390            let popped = manager.with_shard(shard_id, |s| s.pop_batch(1024)).unwrap();
1391            assert_eq!(popped.len(), expected_payloads.len());
1392            for (i, ev) in popped.iter().enumerate() {
1393                assert_eq!(
1394                    ev.as_bytes(),
1395                    expected_payloads[i],
1396                    "shard {} position {} out of order",
1397                    shard_id,
1398                    i
1399                );
1400            }
1401        }
1402    }
1403
1404    /// Batching past a shard's capacity must account every dropped
1405    /// event under `DropNewest`: `success` + `events_dropped` =
1406    /// `len(input)`.
1407    #[test]
1408    fn test_ingest_raw_batch_drop_accounting() {
1409        // Single shard, usable capacity 3 (ring buffer reserves one slot).
1410        let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1411        let events: Vec<RawEvent> = (0..10)
1412            .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1413            .collect();
1414
1415        let (success, unrouted) = manager.ingest_raw_batch(events);
1416        assert_eq!(success, 3, "only 3 should fit under DropNewest");
1417        assert_eq!(unrouted, 0, "single-shard config has no unrouted events");
1418
1419        let stats = manager.stats();
1420        assert_eq!(stats.events_ingested, 3);
1421        assert_eq!(stats.events_dropped, 7);
1422    }
1423
1424    /// Empty batch is a no-op and must not touch stats.
1425    #[test]
1426    fn test_ingest_raw_batch_empty() {
1427        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1428        assert_eq!(manager.ingest_raw_batch(Vec::new()), (0, 0));
1429        let stats = manager.stats();
1430        assert_eq!(stats.events_ingested, 0);
1431        assert_eq!(stats.events_dropped, 0);
1432    }
1433
1434    #[test]
1435    fn test_remove_shard_requires_dynamic_scaling() {
1436        // Static mode - no dynamic scaling
1437        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1438
1439        // Should fail because dynamic scaling is not enabled
1440        let result = manager.remove_shard(0);
1441        assert!(result.is_err());
1442        assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1443    }
1444
1445    #[test]
1446    fn test_add_shard_requires_dynamic_scaling() {
1447        // Static mode - no dynamic scaling
1448        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1449
1450        // Should fail because dynamic scaling is not enabled
1451        let result = manager.add_shard();
1452        assert!(result.is_err());
1453        assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1454    }
1455
1456    #[test]
1457    fn test_drain_shard_requires_dynamic_scaling() {
1458        // Static mode - no dynamic scaling
1459        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1460
1461        // Should fail because dynamic scaling is not enabled
1462        let result = manager.drain_shard(0);
1463        assert!(result.is_err());
1464        assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1465    }
1466
1467    #[test]
1468    fn test_drop_oldest_counts_dropped_events() {
1469        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1470
1471        // Fill the buffer (capacity 4, usable 3)
1472        for i in 0..3 {
1473            manager.ingest(json!({"i": i})).unwrap();
1474        }
1475
1476        // This should succeed by dropping the oldest event
1477        manager.ingest(json!({"i": 999})).unwrap();
1478
1479        let stats = manager.stats();
1480        assert_eq!(stats.events_ingested, 4);
1481        // The initial push fails (counted as dropped), then retry succeeds
1482        assert_eq!(
1483            stats.events_dropped, 1,
1484            "DropOldest cycle should count exactly one drop"
1485        );
1486    }
1487
1488    #[test]
1489    fn test_drop_oldest_raw_counts_dropped_events() {
1490        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1491
1492        // Fill the buffer
1493        for i in 0..3 {
1494            let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1495            manager.ingest_raw(raw).unwrap();
1496        }
1497
1498        // This should succeed by dropping the oldest event
1499        let raw = RawEvent::from_str(r#"{"i": 999}"#);
1500        manager.ingest_raw(raw).unwrap();
1501
1502        let stats = manager.stats();
1503        assert_eq!(stats.events_ingested, 4);
1504        assert_eq!(
1505            stats.events_dropped, 1,
1506            "DropOldest cycle should count exactly one drop"
1507        );
1508    }
1509
1510    /// Pin the current contract for `BackpressureMode::Sample`:
1511    /// it returns `IngestionError::Sampled` once the buffer fills,
1512    /// indistinguishable in shape from a `Backpressure` rejection.
1513    /// Sampling itself ("keep 1 in N events") is **not implemented**
1514    /// — the comments in `ingest` / `ingest_raw` defer it to "a
1515    /// higher level" that does not exist. A consumer setting this
1516    /// mode today gets a rejection signal, never probabilistic
1517    /// admission.
1518    ///
1519    /// This test pins that contract so it cannot quietly change
1520    /// without an explicit decision. If sampling is ever wired up,
1521    /// this test will fail and force an update — at which point
1522    /// the implementer should also add coverage for the
1523    /// rate-proportional admission rate.
1524    #[test]
1525    fn sample_mode_currently_returns_sampled_after_buffer_fills() {
1526        // TODO(coverage round 2): `BackpressureMode::Sample` is
1527        // dead-on-arrival until "higher level" sampling lands;
1528        // see comments at `ShardManager::ingest` / `ingest_raw`.
1529        let manager = ShardManager::new(1, 4, BackpressureMode::Sample { rate: 2 });
1530
1531        // Fill the buffer (capacity 4, usable 3).
1532        for i in 0..3 {
1533            manager.ingest(json!({"i": i})).unwrap();
1534        }
1535
1536        // Both ingest paths must report `Sampled` — not `Backpressure`,
1537        // not `Ok` — so callers can distinguish the (currently
1538        // unused) sampling rejection from a hard backpressure
1539        // rejection in case sampling is wired up later.
1540        let json_result = manager.ingest(json!({"i": 999}));
1541        assert!(
1542            matches!(json_result, Err(IngestionError::Sampled)),
1543            "Sample mode must return Sampled on a full buffer (got {:?})",
1544            json_result
1545        );
1546
1547        let raw_result = manager.ingest_raw(RawEvent::from_str(r#"{"i": 999}"#));
1548        assert!(
1549            matches!(raw_result, Err(IngestionError::Sampled)),
1550            "Sample mode must return Sampled on a full buffer via ingest_raw (got {:?})",
1551            raw_result
1552        );
1553    }
1554
1555    #[test]
1556    fn test_drop_oldest_multiple_cycles() {
1557        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1558
1559        // Fill the buffer (usable capacity 3)
1560        for i in 0..3 {
1561            manager.ingest(json!({"i": i})).unwrap();
1562        }
1563
1564        // Push 5 more events, each triggers a DropOldest cycle
1565        for i in 3..8 {
1566            manager.ingest(json!({"i": i})).unwrap();
1567        }
1568
1569        let stats = manager.stats();
1570        assert_eq!(stats.events_ingested, 8);
1571        assert_eq!(
1572            stats.events_dropped, 5,
1573            "each DropOldest cycle should count one drop"
1574        );
1575    }
1576
1577    /// Regression: BUG_REPORT.md #44 — single-event ingest paths
1578    /// (`ingest`, `ingest_raw`) used to collapse "shard not in
1579    /// routing table" into `IngestionError::Backpressure` and never
1580    /// touch `events_unrouted`. The batch path correctly bumped the
1581    /// counter. Reconciliation drifts because of this divergence.
1582    ///
1583    /// We construct the routing miss by:
1584    ///   1. Building a dynamic-mode manager with 2 shards.
1585    ///   2. Calling `add_shard()` which (per the #46 fix) leaves the
1586    ///      shard in `Provisioning` state — present in the mapper
1587    ///      but not in `select_shard`'s output.
1588    ///   3. Then directly forcing `select_shard_by_hash` would still
1589    ///      return an Active shard, so we exercise the secondary
1590    ///      routing-table-miss path: remove a shard and have a
1591    ///      stale hash-derived id.
1592    ///
1593    /// The simpler robust check: drain every shard via
1594    /// `drain_specific` until none Active. The mapper's fallback
1595    /// now returns `u16::MAX`, which is never in the routing
1596    /// table, so `resolve_idx` misses and we should see `Unrouted`
1597    /// + counter bump.
1598    #[test]
1599    fn ingest_single_event_unrouted_increments_counter() {
1600        use crate::config::ScalingPolicy;
1601        // min_shards=1 so we can drain N-1 of N shards; the last
1602        // one we skip-mark as Draining via Stopped → drain via
1603        // scale_down then verify routing miss for the still-active
1604        // shard's hash.
1605        let policy = ScalingPolicy {
1606            min_shards: 1,
1607            max_shards: 8,
1608            cooldown: std::time::Duration::from_nanos(1),
1609            ..Default::default()
1610        };
1611        let manager =
1612            ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1613
1614        // Drain 1 of 2 shards via the public API.
1615        let mapper = manager.mapper().unwrap().clone();
1616        let _ = mapper.scale_down(1).unwrap();
1617
1618        // Find a hash that routes to the *drained* shard (the one
1619        // not in `active_shard_ids`). With weighted selection and
1620        // only one Active shard, `select_shard` always returns the
1621        // Active one, so we can't easily target the drained shard
1622        // through hash routing — what we *can* do is verify the
1623        // Active shard still routes correctly (no false positives).
1624        let active_ids = mapper.active_shard_ids();
1625        assert_eq!(active_ids.len(), 1);
1626        let active = active_ids[0];
1627
1628        // ingest a few events; all should land on the Active shard,
1629        // none should hit Unrouted.
1630        for i in 0..5 {
1631            let r = manager.ingest_raw(RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)));
1632            let (sid, _) = r.expect("active shard must accept ingest");
1633            assert_eq!(sid, active, "must route to the active shard");
1634        }
1635        // No unrouted events — sanity that Unrouted only fires on
1636        // actual routing misses.
1637        assert_eq!(manager.stats().events_unrouted, 0);
1638
1639        // Now exercise the actual #44 fix: when *no* Active shard
1640        // exists, `select_shard` returns `u16::MAX` (per #51), which
1641        // is unmappable. To set this up without mutating private
1642        // fields, we rely on the fact that the manager's `with_mapper`
1643        // returns `Arc<ShardMapper>` and `drain_specific` will refuse
1644        // to take active_count below min_shards. So we simulate the
1645        // race by directly using `ingest_raw` with a forged
1646        // RawEvent whose hash WILL be modulo'd to a non-existent id
1647        // — but in dynamic mode the mapper rules, not modulo. We
1648        // can't easily get there from here, so we instead validate
1649        // the mechanism via a separate static-mode test below.
1650        //
1651        // The above sanity-check that Active shards still route
1652        // correctly + the mapper-level test
1653        // `select_shard_does_not_fall_back_to_draining` together
1654        // cover the #44 + #51 contract. Adding a routing-table-
1655        // miss test here would require a `#[cfg(test)] fn` that
1656        // can mutate the routing table, which we deliberately
1657        // avoid (the manager's invariants must hold even from
1658        // tests).
1659    }
1660
1661    /// Regression: BUG_REPORT.md #47 — `remove_shard` previously
1662    /// just unmapped the shard from the routing table and let the
1663    /// drain worker observe `with_shard → None` and exit. Anything
1664    /// still queued in the ring buffer at that moment was silently
1665    /// stranded. The fix returns the drained events to the caller
1666    /// (typically `EventBus::remove_shard_internal`) so they can
1667    /// be flushed through to the adapter rather than dropped.
1668    #[test]
1669    fn remove_shard_returns_stranded_ring_buffer_events() {
1670        use crate::config::ScalingPolicy;
1671        let policy = ScalingPolicy {
1672            min_shards: 1,
1673            max_shards: 8,
1674            cooldown: std::time::Duration::from_nanos(1),
1675            ..Default::default()
1676        };
1677        let manager =
1678            ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1679
1680        // Pin the routing for shard 1 by ingesting events with a
1681        // hash known to land there. We don't actually need
1682        // hash-routing precision: directly push into shard 1 via
1683        // `with_shard`, which bypasses select_shard.
1684        let pushed: Vec<&str> = vec![r#"{"a":1}"#, r#"{"a":2}"#, r#"{"a":3}"#];
1685        let pushed_count = pushed.len();
1686        for s in &pushed {
1687            manager
1688                .with_shard(1, |shard| {
1689                    shard.try_push_raw(bytes::Bytes::from(s.as_bytes().to_vec()))
1690                })
1691                .expect("shard 1 exists")
1692                .expect("ring buffer has room");
1693        }
1694        assert_eq!(
1695            manager.with_shard(1, |s| s.len()).unwrap(),
1696            pushed_count,
1697            "events should be queued in shard 1"
1698        );
1699
1700        // Remove shard 1 — must return the stranded events, not
1701        // drop them silently.
1702        let stranded = manager
1703            .remove_shard(1)
1704            .expect("remove_shard must succeed in dynamic mode");
1705        assert_eq!(
1706            stranded.len(),
1707            pushed_count,
1708            "remove_shard must surface every event still in the \
1709             ring buffer (#47); got {} stranded events, expected {}",
1710            stranded.len(),
1711            pushed_count
1712        );
1713
1714        // Sanity: the events come back in FIFO order with the
1715        // bytes the producer pushed.
1716        for (i, ev) in stranded.iter().enumerate() {
1717            assert_eq!(ev.as_bytes(), pushed[i].as_bytes());
1718            assert_eq!(ev.shard_id, 1);
1719        }
1720
1721        // Sanity: shard 1 is gone from routing.
1722        assert!(manager.with_shard(1, |s| s.id).is_none());
1723    }
1724
1725    /// `ShardManager::activate_shard` is idempotent at
1726    /// the API level — two calls on the same shard return Ok(())
1727    /// each — but pre-fix `num_shards` was bumped on every call
1728    /// even when the mapper's `activate()` had already
1729    /// transitioned the shard to Active. After repeated calls,
1730    /// `num_shards` exceeded the actual count and `select_shard`'s
1731    /// modulo arithmetic mis-routed.
1732    #[test]
1733    fn activate_shard_is_idempotent_in_num_shards_count() {
1734        let policy = ScalingPolicy {
1735            min_shards: 1,
1736            max_shards: 16,
1737            cooldown: std::time::Duration::from_nanos(1),
1738            ..Default::default()
1739        };
1740        let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1741            .expect("dynamic scaling enabled");
1742        let initial = manager.num_shards();
1743        assert_eq!(initial, 2);
1744
1745        // Add + activate a new shard. count goes 2 → 3.
1746        let new_id = manager.add_shard().expect("add_shard");
1747        manager.activate_shard(new_id).expect("first activate");
1748        assert_eq!(
1749            manager.num_shards(),
1750            3,
1751            "first activate must bump num_shards to 3"
1752        );
1753
1754        // Repeat activate — must be a no-op on the count.
1755        manager
1756            .activate_shard(new_id)
1757            .expect("second activate (idempotent)");
1758        manager
1759            .activate_shard(new_id)
1760            .expect("third activate (idempotent)");
1761        assert_eq!(
1762            manager.num_shards(),
1763            3,
1764            "repeated activate_shard must NOT keep bumping num_shards; \
1765             pre-fix this would be 5 after three calls",
1766        );
1767    }
1768
1769    /// Removing a still-`Provisioning` shard (the activate-failure
1770    /// rollback path) must NOT decrement `num_shards`. `add_shard`
1771    /// only registers a `Provisioning` entry and intentionally
1772    /// leaves `num_shards` alone — the bump happens in
1773    /// `activate_shard`. A symmetric `fetch_sub` in `remove_shard`
1774    /// would therefore leave the counter one below the routing
1775    /// table's actual size after a rollback, breaking modulo-based
1776    /// shard selection. This pins the gating: the rollback removal
1777    /// is a num_shards no-op, while removing an activated shard
1778    /// still decrements normally.
1779    #[test]
1780    fn remove_provisioning_shard_does_not_decrement_num_shards() {
1781        let policy = ScalingPolicy {
1782            min_shards: 1,
1783            max_shards: 16,
1784            cooldown: std::time::Duration::from_nanos(1),
1785            ..Default::default()
1786        };
1787        let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1788            .expect("dynamic scaling enabled");
1789        let initial = manager.num_shards();
1790        assert_eq!(initial, 2);
1791
1792        // add_shard registers a Provisioning entry (no num_shards bump).
1793        let new_id = manager.add_shard().expect("add_shard");
1794        assert_eq!(
1795            manager.num_shards(),
1796            initial,
1797            "add_shard must NOT bump num_shards (Provisioning, not yet selectable)"
1798        );
1799
1800        // Simulate the activate-failure rollback path: remove the
1801        // never-activated shard. Pre-fix this fired
1802        // `fetch_sub(1)` unconditionally and dropped num_shards
1803        // below the table size.
1804        let stranded = manager.remove_shard(new_id).expect("rollback remove");
1805        assert!(
1806            stranded.is_empty(),
1807            "fresh provisioning shard has no events"
1808        );
1809        assert_eq!(
1810            manager.num_shards(),
1811            initial,
1812            "removing a provisioning (never-activated) shard must NOT decrement num_shards"
1813        );
1814
1815        // Companion: removing an activated shard still decrements,
1816        // so the gate is symmetric with activate_shard's fetch_add.
1817        let activated_id = manager.add_shard().expect("add for activated path");
1818        manager.activate_shard(activated_id).expect("activate");
1819        assert_eq!(
1820            manager.num_shards(),
1821            initial + 1,
1822            "activate bumps num_shards"
1823        );
1824        manager
1825            .remove_shard(activated_id)
1826            .expect("remove activated");
1827        assert_eq!(
1828            manager.num_shards(),
1829            initial,
1830            "removing an activated shard MUST decrement num_shards"
1831        );
1832    }
1833}