net/shard/mod.rs
1//! Shard management for parallel event ingestion.
2//!
3//! The shard module provides:
4//! - Lock-free ring buffers for high-throughput event queuing
5//! - Per-shard timestamp generation (no cross-shard contention)
6//! - Batch assembly with adaptive sizing
7//! - Shard manager for coordinating multiple shards
8//! - Dynamic shard scaling with weighted producer routing
9
10mod batch;
11mod mapper;
12mod ring_buffer;
13
14pub use batch::{AdaptiveBatcher, BatchWorker};
15pub use mapper::{
16 ScalingDecision, ScalingError, ShardMapper, ShardMetrics, ShardMetricsCollector, ShardState,
17};
18// `RingBuffer` and `BufferFullError` are intentionally NOT re-exported.
19// External callers go through `EventBus` / `ShardManager`, which
20// uphold the SPSC contract via `Mutex<Shard>`. Exposing the raw ring
21// buffer publicly was a silent-UB footgun — anyone wrapping it in an
22// `Arc` and pushing from two threads got data corruption with no
23// compile-time signal. `BufferFullError` is not
24// re-exported here either: callers see it as `IngestionError::Backpressure`.
25pub(crate) use ring_buffer::RingBuffer;
26
27// Re-export ScalingPolicy from config for convenience
28pub use crate::config::ScalingPolicy;
29
30use bytes::Bytes;
31
32use crate::config::BackpressureMode;
33use crate::error::IngestionError;
34use crate::event::{InternalEvent, RawEvent};
35use crate::timestamp::TimestampGenerator;
36
37use serde_json::Value as JsonValue;
38use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
39use std::sync::Arc;
40use std::time::Instant;
41
42/// Atomic counters for a single shard. Kept outside `Shard` as `Arc`s
43/// so `ShardManager::stats()` can aggregate them without locking each
44/// shard's mutex.
45#[derive(Debug, Default)]
46pub struct ShardCounters {
47 /// Total events ingested into this shard.
48 pub events_ingested: AtomicU64,
49 /// Events dropped due to backpressure.
50 pub events_dropped: AtomicU64,
51 /// Batches successfully dispatched to the adapter.
52 pub batches_dispatched: AtomicU64,
53}
54
55/// Statistics for a single shard (snapshot).
56#[derive(Debug, Default, Clone, Copy)]
57pub struct ShardStats {
58 /// Total events ingested.
59 pub events_ingested: u64,
60 /// Events dropped due to backpressure.
61 pub events_dropped: u64,
62 /// Batches dispatched to adapter.
63 pub batches_dispatched: u64,
64 /// Events that arrived at `ingest_raw_batch` but had no resolvable
65 /// shard (e.g. the routing table was rebuilt mid-dispatch and the
66 /// hashed shard id is no longer present). These cannot be
67 /// attributed to a per-shard counter, so they are tracked at the
68 /// `ShardManager` level and surfaced through aggregated `stats()`.
69 pub events_unrouted: u64,
70}
71
72impl ShardCounters {
73 /// Load a consistent snapshot of the counters.
74 ///
75 /// `events_unrouted` is left at zero here — it is a manager-level
76 /// counter, not a per-shard one. `ShardManager::stats()` fills it
77 /// in after summing per-shard fields.
78 #[inline]
79 pub fn snapshot(&self) -> ShardStats {
80 ShardStats {
81 events_ingested: self.events_ingested.load(AtomicOrdering::Relaxed),
82 events_dropped: self.events_dropped.load(AtomicOrdering::Relaxed),
83 batches_dispatched: self.batches_dispatched.load(AtomicOrdering::Relaxed),
84 events_unrouted: 0,
85 }
86 }
87}
88
89/// A single shard with its own ring buffer and timestamp generator.
90pub struct Shard {
91 /// Shard identifier.
92 pub id: u16,
93 /// Ring buffer for event queuing.
94 ring_buffer: RingBuffer<InternalEvent>,
95 /// Shard-local timestamp generator (no contention).
96 timestamp_gen: TimestampGenerator,
97 /// Shared atomic counters (also referenced from `ShardTable` for
98 /// lock-free aggregation).
99 counters: Arc<ShardCounters>,
100 /// Optional metrics collector for dynamic scaling.
101 metrics_collector: Option<Arc<ShardMetricsCollector>>,
102 /// Ring buffer capacity (for metrics).
103 capacity: usize,
104}
105
106impl Shard {
107 /// Create a new shard.
108 pub fn new(id: u16, capacity: usize) -> Self {
109 Self {
110 id,
111 ring_buffer: RingBuffer::new(capacity),
112 timestamp_gen: TimestampGenerator::new(),
113 counters: Arc::new(ShardCounters::default()),
114 metrics_collector: None,
115 capacity,
116 }
117 }
118
119 /// Create a new shard with a metrics collector for dynamic scaling.
120 pub fn with_metrics(id: u16, capacity: usize, metrics: Arc<ShardMetricsCollector>) -> Self {
121 Self {
122 id,
123 ring_buffer: RingBuffer::new(capacity),
124 timestamp_gen: TimestampGenerator::new(),
125 counters: Arc::new(ShardCounters::default()),
126 metrics_collector: Some(metrics),
127 capacity,
128 }
129 }
130
131 /// Clone the atomic counter handle (for lock-free aggregation).
132 #[inline]
133 pub fn counters(&self) -> Arc<ShardCounters> {
134 self.counters.clone()
135 }
136
137 /// Set the metrics collector.
138 pub fn set_metrics_collector(&mut self, metrics: Arc<ShardMetricsCollector>) {
139 self.metrics_collector = Some(metrics);
140 }
141
142 /// Try to push a raw event (pre-serialized bytes) into the shard's ring buffer.
143 /// Returns the assigned insertion timestamp on success.
144 ///
145 /// This is the fastest ingestion path - no serialization or hashing needed.
146 #[inline]
147 pub fn try_push_raw(&mut self, raw: Bytes) -> Result<u64, IngestionError> {
148 let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
149 let ts = self.timestamp_gen.next();
150 let event = InternalEvent::new(raw, ts, self.id);
151
152 match self.ring_buffer.try_push(event) {
153 Ok(()) => {
154 self.counters
155 .events_ingested
156 .fetch_add(1, AtomicOrdering::Relaxed);
157 if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
158 collector.record_push(start.elapsed().as_nanos() as u64);
159 collector.record_buffer_len(self.ring_buffer.len());
160 }
161 Ok(ts)
162 }
163 Err(_) => {
164 self.counters
165 .events_dropped
166 .fetch_add(1, AtomicOrdering::Relaxed);
167 Err(IngestionError::Backpressure)
168 }
169 }
170 }
171
172 /// Try to push a JSON value into the shard's ring buffer.
173 /// Returns the assigned insertion timestamp on success.
174 ///
175 /// This serializes the value once before storing.
176 #[inline]
177 pub fn try_push(&mut self, raw: JsonValue) -> Result<u64, IngestionError> {
178 let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
179 let ts = self.timestamp_gen.next();
180 let event = InternalEvent::from_value(raw, ts, self.id);
181
182 match self.ring_buffer.try_push(event) {
183 Ok(()) => {
184 self.counters
185 .events_ingested
186 .fetch_add(1, AtomicOrdering::Relaxed);
187 if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
188 collector.record_push(start.elapsed().as_nanos() as u64);
189 collector.record_buffer_len(self.ring_buffer.len());
190 }
191 Ok(ts)
192 }
193 Err(_) => {
194 self.counters
195 .events_dropped
196 .fetch_add(1, AtomicOrdering::Relaxed);
197 Err(IngestionError::Backpressure)
198 }
199 }
200 }
201
202 /// Pop a batch of events from the ring buffer.
203 ///
204 /// Allocates a fresh `Vec`. Prefer [`pop_batch_into`] in drain
205 /// loops where the per-cycle `Vec` allocation should happen
206 /// outside the shard mutex.
207 ///
208 /// [`pop_batch_into`]: Self::pop_batch_into
209 #[inline]
210 pub fn pop_batch(&mut self, max: usize) -> Vec<InternalEvent> {
211 let out = self.ring_buffer.pop_batch(max);
212 if let Some(collector) = &self.metrics_collector {
213 collector.record_buffer_len(self.ring_buffer.len());
214 }
215 out
216 }
217
218 /// Pop a batch of events into a caller-owned buffer.
219 ///
220 /// Append semantics: does **not** clear `dst`; reserves
221 /// `count` slots and pushes drained elements onto the end.
222 /// Returns the number drained this call. Use this in
223 /// steady-state drain loops where the caller keeps a scratch
224 /// `Vec` across cycles, so the per-cycle allocation moves out
225 /// of the consumer's critical section.
226 #[inline]
227 pub fn pop_batch_into(&mut self, dst: &mut Vec<InternalEvent>, max: usize) -> usize {
228 let n = self.ring_buffer.pop_batch_into(dst, max);
229 if let Some(collector) = &self.metrics_collector {
230 collector.record_buffer_len(self.ring_buffer.len());
231 }
232 n
233 }
234
235 /// Try to pop a single event from the ring buffer.
236 #[inline]
237 pub fn try_pop(&mut self) -> Option<InternalEvent> {
238 let out = self.ring_buffer.try_pop();
239 if let Some(collector) = &self.metrics_collector {
240 collector.record_buffer_len(self.ring_buffer.len());
241 }
242 out
243 }
244
245 /// Producer-side eviction of the oldest event.
246 ///
247 /// Used by `BackpressureMode::DropOldest` to make room for a
248 /// new push when the buffer is full. Bypasses the ring buffer's
249 /// consumer-thread tracking (the producer thread is calling
250 /// what is normally a consumer-side operation). Safe because
251 /// the outer shard mutex serializes this against any concurrent
252 /// `try_pop` from the legitimate consumer (the batch worker).
253 #[inline]
254 pub(crate) fn evict_oldest(&mut self) -> Option<InternalEvent> {
255 self.ring_buffer.evict_oldest()
256 }
257
258 /// Get the current buffer length.
259 #[inline]
260 pub fn len(&self) -> usize {
261 self.ring_buffer.len()
262 }
263
264 /// Check if the buffer is empty.
265 #[inline]
266 pub fn is_empty(&self) -> bool {
267 self.ring_buffer.is_empty()
268 }
269
270 /// Check if the buffer is full.
271 #[inline]
272 pub fn is_full(&self) -> bool {
273 self.ring_buffer.is_full()
274 }
275
276 /// Get the fill ratio (0.0 - 1.0).
277 #[inline]
278 pub fn fill_ratio(&self) -> f64 {
279 if self.capacity == 0 {
280 0.0
281 } else {
282 self.ring_buffer.len() as f64 / self.capacity as f64
283 }
284 }
285
286 /// Get the ring buffer capacity.
287 #[inline]
288 pub fn capacity(&self) -> usize {
289 self.capacity
290 }
291
292 /// Get a snapshot of shard statistics.
293 pub fn stats(&self) -> ShardStats {
294 self.counters.snapshot()
295 }
296
297 /// Record a batch dispatch.
298 pub fn record_batch_dispatch(&self) {
299 self.counters
300 .batches_dispatched
301 .fetch_add(1, AtomicOrdering::Relaxed);
302 }
303}
304
305/// Immutable routing table: shards + index + counter handles.
306///
307/// Placed behind an `ArcSwap` on `ShardManager` so the common read
308/// path (`ingest`, `ingest_raw`, `with_shard`, `stats`) is
309/// lock-free. Rebuilt on scale up/down via RCU-style swap.
310pub struct ShardTable {
311 /// All shards, indexed by position. `Arc<Mutex<Shard>>` lets a new
312 /// table share shard handles with the previous table (cheap Arc
313 /// clones during rebuild).
314 shards: Vec<Arc<parking_lot::Mutex<Shard>>>,
315 /// Parallel vector of counter handles. Exposes stats without
316 /// locking the shard mutex.
317 counters: Vec<Arc<ShardCounters>>,
318 /// Map from shard ID to index in `shards`/`counters`.
319 shard_index: std::collections::HashMap<u16, usize>,
320}
321
322impl ShardTable {
323 fn new(shards: Vec<Shard>) -> Self {
324 let mut shard_index = std::collections::HashMap::with_capacity(shards.len());
325 let mut counters = Vec::with_capacity(shards.len());
326 let shards: Vec<_> = shards
327 .into_iter()
328 .enumerate()
329 .map(|(idx, s)| {
330 shard_index.insert(s.id, idx);
331 counters.push(s.counters());
332 Arc::new(parking_lot::Mutex::new(s))
333 })
334 .collect();
335 Self {
336 shards,
337 counters,
338 shard_index,
339 }
340 }
341}
342
343/// Manager for multiple shards.
344///
345/// The ShardManager can operate in two modes:
346/// 1. Static mode (default): Fixed number of shards, simple hash-based routing
347/// 2. Dynamic mode: Shards can be added/removed based on load, weighted routing
348pub struct ShardManager {
349 /// Routing table. Swapped atomically on scale up/down so readers
350 /// never see a partially-updated `(shards, shard_index)` pair.
351 table: arc_swap::ArcSwap<ShardTable>,
352 /// Current number of active shards.
353 num_shards: std::sync::atomic::AtomicU16,
354 /// Backpressure mode.
355 backpressure_mode: BackpressureMode,
356 /// Ring buffer capacity for new shards.
357 ring_buffer_capacity: usize,
358 /// Optional shard mapper for dynamic scaling.
359 mapper: Option<Arc<ShardMapper>>,
360 /// Serializes concurrent `add_shard` / `remove_shard` rebuilds.
361 /// Not on the ingest path.
362 rebuild_lock: parking_lot::Mutex<()>,
363 /// Events dropped because no destination shard was resolvable.
364 /// Distinct from per-shard `events_dropped` (which tracks
365 /// backpressure on a known shard) — this counts events whose
366 /// hashed shard id was missing from the routing table at lookup
367 /// time, e.g. due to a concurrent scale-down. Surfaced via
368 /// `stats().events_unrouted`.
369 events_unrouted: AtomicU64,
370}
371
372impl ShardManager {
373 /// Create a new shard manager (static mode).
374 pub fn new(
375 num_shards: u16,
376 ring_buffer_capacity: usize,
377 backpressure_mode: BackpressureMode,
378 ) -> Self {
379 let shards: Vec<Shard> = (0..num_shards)
380 .map(|id| Shard::new(id, ring_buffer_capacity))
381 .collect();
382
383 Self {
384 table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
385 num_shards: std::sync::atomic::AtomicU16::new(num_shards),
386 backpressure_mode,
387 ring_buffer_capacity,
388 mapper: None,
389 rebuild_lock: parking_lot::Mutex::new(()),
390 events_unrouted: AtomicU64::new(0),
391 }
392 }
393
394 /// Create a new shard manager with dynamic scaling enabled.
395 pub fn with_mapper(
396 num_shards: u16,
397 ring_buffer_capacity: usize,
398 backpressure_mode: BackpressureMode,
399 policy: ScalingPolicy,
400 ) -> Result<Self, ScalingError> {
401 let mapper = Arc::new(ShardMapper::new(num_shards, ring_buffer_capacity, policy)?);
402
403 let shards: Vec<Shard> = (0..num_shards)
404 .map(|id| {
405 let metrics = mapper.metrics_collector(id).ok_or_else(|| {
406 ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", id))
407 })?;
408 Ok(Shard::with_metrics(id, ring_buffer_capacity, metrics))
409 })
410 .collect::<Result<Vec<_>, ScalingError>>()?;
411
412 Ok(Self {
413 table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
414 num_shards: std::sync::atomic::AtomicU16::new(num_shards),
415 backpressure_mode,
416 ring_buffer_capacity,
417 mapper: Some(mapper),
418 rebuild_lock: parking_lot::Mutex::new(()),
419 events_unrouted: AtomicU64::new(0),
420 })
421 }
422
423 /// Get the shard mapper (if dynamic scaling is enabled).
424 pub fn mapper(&self) -> Option<&Arc<ShardMapper>> {
425 self.mapper.as_ref()
426 }
427
428 /// Get the number of active shards.
429 #[inline]
430 pub fn num_shards(&self) -> u16 {
431 self.num_shards.load(std::sync::atomic::Ordering::Acquire)
432 }
433
434 /// Get the backpressure mode.
435 #[inline]
436 pub fn backpressure_mode(&self) -> BackpressureMode {
437 self.backpressure_mode
438 }
439
440 /// Select a shard for an event based on its content hash.
441 /// Uses weighted selection if dynamic scaling is enabled.
442 ///
443 /// **Prefer [`select_shard_by_hash`].** This method serializes the
444 /// `JsonValue` to bytes just to compute the hash; if you already
445 /// have a `RawEvent` (or any pre-computed `xxh3_64` of the
446 /// canonical bytes), pass that hash directly. The internal
447 /// ingest paths all do — this method exists for ad-hoc external
448 /// callers that haven't yet adopted the `RawEvent` pattern.
449 ///
450 /// [`select_shard_by_hash`]: Self::select_shard_by_hash
451 #[inline]
452 #[deprecated(
453 since = "0.10.0",
454 note = "serializes the value just to hash it; prefer `RawEvent::from_value(v).hash()` + `select_shard_by_hash` to avoid the duplicate serialization"
455 )]
456 #[expect(
457 clippy::expect_used,
458 reason = "serde_json::to_vec on a JsonValue (which round-tripped from JSON or was built via the type's own constructors) is infallible"
459 )]
460 pub fn select_shard(&self, event: &JsonValue) -> u16 {
461 // Use xxhash for fast, deterministic hashing. `to_vec` avoids the
462 // extra UTF-8 validation that `to_string` performs on the serialized
463 // buffer, since we only need the bytes for hashing.
464 let bytes = serde_json::to_vec(event).expect("Value serialization is infallible");
465 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
466 self.select_shard_by_hash(hash)
467 }
468
469 /// Select a shard using a pre-computed hash.
470 ///
471 /// This is faster than `select_shard` when you already have the hash.
472 #[inline]
473 pub fn select_shard_by_hash(&self, hash: u64) -> u16 {
474 if let Some(ref mapper) = self.mapper {
475 // Dynamic mode: use weighted selection
476 mapper.select_shard(hash)
477 } else {
478 // Static mode: Lemire's bias-free multiply-shift mapping.
479 // Pre-fix this ran `hash % num_shards` per event — modulo
480 // by a non-constant `u64` is a `div` on every modern uarch
481 // (~20-25 cycles) and dwarfs the upstream xxh3 hash cost
482 // for the static-mode hot path. The multiply-shift form
483 // `((hash * n) >> 64)` is ~3 cycles and is the same
484 // unbiased reduction already used in `mapper.rs:660`. Per
485 // net-perf #7.
486 //
487 // The defensive guard against `num_shards == 0` stays —
488 // config validation rejects 0 at startup and `scale_down`
489 // requires `current > min_shards >= 1` so this branch is
490 // unreachable today, but a stray 0 here would otherwise
491 // make the multiply land at index 0 silently while the
492 // legacy modulo would have panicked. Returning 0
493 // explicitly preserves the legacy "any select returns
494 // shard 0" failure mode without the panic.
495 let num_shards = self.num_shards.load(std::sync::atomic::Ordering::Acquire);
496 debug_assert!(num_shards > 0, "num_shards must be > 0");
497 if num_shards == 0 {
498 return 0;
499 }
500 ((hash as u128 * num_shards as u128) >> 64) as u16
501 }
502 }
503
504 /// Resolve a shard ID to its table index, using the fast path in
505 /// static mode (shard_id == index).
506 #[inline]
507 fn resolve_idx(&self, table: &ShardTable, shard_id: u16) -> Option<usize> {
508 if self.mapper.is_none() {
509 Some(shard_id as usize)
510 } else {
511 table.shard_index.get(&shard_id).copied()
512 }
513 }
514
515 /// Push `raw` into `shard`, handling backpressure. Only clones the
516 /// bytes when `DropOldest` needs them for the retry path.
517 #[inline]
518 fn push_with_backpressure(
519 &self,
520 shard: &mut Shard,
521 shard_id: u16,
522 raw: Bytes,
523 ) -> Result<(u16, u64), IngestionError> {
524 match self.backpressure_mode {
525 BackpressureMode::DropOldest => match shard.try_push_raw(raw.clone()) {
526 Ok(ts) => Ok((shard_id, ts)),
527 Err(IngestionError::Backpressure) => {
528 // The failed try_push_raw incremented events_dropped for
529 // the *new* event, but the new event isn't actually
530 // dropped — the oldest is. Correct the stats: undo the
531 // spurious drop count, evict the oldest (which is the real
532 // drop), and retry with the same ref-counted bytes.
533 //
534 // Use the producer-side `evict_oldest` rather
535 // than `try_pop`. Calling `try_pop` from the
536 // producer thread would violate the SPSC consumer
537 // contract (the
538 // legitimate consumer is the batch worker, on a
539 // different task / thread).
540 //
541 // Transient stats note: a concurrent reader of
542 // `manager.stats().events_dropped` between the
543 // `fetch_sub` and the second `fetch_add` would
544 // briefly observe the pre-correction value
545 // (one less than reality). The net delta over
546 // the whole retry is `+1`, matching the real
547 // drop. Documented as snapshot-not-coherent
548 // per `ShardCounters::snapshot`'s contract.
549 shard
550 .counters
551 .events_dropped
552 .fetch_sub(1, AtomicOrdering::Relaxed);
553 let _ = shard.evict_oldest();
554 shard
555 .counters
556 .events_dropped
557 .fetch_add(1, AtomicOrdering::Relaxed);
558 shard.try_push_raw(raw).map(|ts| (shard_id, ts))
559 }
560 Err(e) => Err(e),
561 },
562 BackpressureMode::Sample { .. } => match shard.try_push_raw(raw) {
563 Ok(ts) => Ok((shard_id, ts)),
564 Err(IngestionError::Backpressure) => Err(IngestionError::Sampled),
565 Err(e) => Err(e),
566 },
567 BackpressureMode::DropNewest | BackpressureMode::FailProducer => {
568 shard.try_push_raw(raw).map(|ts| (shard_id, ts))
569 }
570 }
571 }
572
573 /// Ingest an event into the appropriate shard.
574 pub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError> {
575 // Serialize once upfront - avoids clone on retry
576 let raw = Bytes::from(serde_json::to_vec(&event)?);
577 let hash = xxhash_rust::xxh3::xxh3_64(&raw);
578 let shard_id = self.select_shard_by_hash(hash);
579
580 let table = self.table.load();
581 // Surface "no routable destination" as `Unrouted` (not
582 // `Backpressure`) and bump the manager-level
583 // `events_unrouted` counter so per-event vs. batch-path
584 // accounting agree. The secondary `table.shards.get(idx)`
585 // miss should be impossible by the `shard_index ↔ shards`
586 // invariant — keep returning `Unrouted` defensively rather
587 // than panicking.
588 let Some(idx) = self.resolve_idx(&table, shard_id) else {
589 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
590 return Err(IngestionError::Unrouted);
591 };
592 let Some(shard_lock) = table.shards.get(idx) else {
593 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
594 return Err(IngestionError::Unrouted);
595 };
596
597 let mut shard = shard_lock.lock();
598 self.push_with_backpressure(&mut shard, shard_id, raw)
599 }
600
601 /// Ingest a raw event (pre-serialized with cached hash).
602 ///
603 /// This is the fastest ingestion path:
604 /// - Uses pre-computed hash for shard selection (no serialization)
605 /// - Stores bytes directly (no clone needed, reference-counted)
606 #[inline]
607 pub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError> {
608 let shard_id = self.select_shard_by_hash(event.hash());
609
610 let table = self.table.load();
611 // See `ingest` above for the `Unrouted` rationale.
612 let Some(idx) = self.resolve_idx(&table, shard_id) else {
613 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
614 return Err(IngestionError::Unrouted);
615 };
616 let Some(shard_lock) = table.shards.get(idx) else {
617 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
618 return Err(IngestionError::Unrouted);
619 };
620
621 let mut shard = shard_lock.lock();
622 self.push_with_backpressure(&mut shard, shard_id, event.bytes())
623 }
624
625 /// Ingest a batch of pre-serialized events, grouped by shard.
626 ///
627 /// Each destination shard's mutex is acquired once and all of that
628 /// shard's events are pushed before releasing. With a uniform hash
629 /// distribution this amortizes lock acquisitions from O(events) to
630 /// O(shards). Backpressure semantics match per-event `ingest_raw`.
631 ///
632 /// Returns `(success, unrouted)` where `success` is the count of
633 /// events successfully pushed onto a shard's ring buffer and
634 /// `unrouted` is the count of events whose destination shard was
635 /// not present in the routing table at the time of dispatch
636 /// (e.g. concurrent scale-down). The remainder
637 /// (`total - success - unrouted`) is the backpressure-class drop
638 /// count.
639 ///
640 /// Returns `(success, unrouted)` rather than just `success`
641 /// so the bus can subtract `unrouted` before publishing
642 /// `events_dropped`. Returning only `success` would let the
643 /// bus's `dropped = total - success` accounting double-count
644 /// unrouted events — they're already tallied on
645 /// `events_unrouted` inside this function.
646 pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize) {
647 if events.is_empty() {
648 return (0, 0);
649 }
650
651 let table = self.table.load();
652
653 // Bucket by table index. Using a Vec<Vec<_>> keyed by index is
654 // cheaper than a HashMap for the common case of a small
655 // shard count.
656 let mut groups: Vec<Vec<Bytes>> = (0..table.shards.len()).map(|_| Vec::new()).collect();
657 let mut group_ids: Vec<u16> = vec![0; groups.len()];
658
659 let mut unrouted = 0usize;
660 for event in events {
661 let shard_id = self.select_shard_by_hash(event.hash());
662 let Some(idx) = self.resolve_idx(&table, shard_id) else {
663 // Routing table doesn't contain the chosen shard
664 // (e.g. concurrent scale-down removed it). The drop
665 // can't be attributed to a per-shard counter; track
666 // it on the manager-level `events_unrouted` so
667 // bus-level vs. per-shard reconciliation is exact.
668 unrouted += 1;
669 continue;
670 };
671 if let Some(g) = groups.get_mut(idx) {
672 if g.is_empty() {
673 group_ids[idx] = shard_id;
674 }
675 g.push(event.bytes());
676 }
677 }
678 if unrouted > 0 {
679 self.events_unrouted
680 .fetch_add(unrouted as u64, AtomicOrdering::Relaxed);
681 }
682
683 let mut success = 0usize;
684 for (idx, group) in groups.into_iter().enumerate() {
685 if group.is_empty() {
686 continue;
687 }
688 let shard_id = group_ids[idx];
689 let Some(shard_lock) = table.shards.get(idx) else {
690 continue;
691 };
692 let mut shard = shard_lock.lock();
693 for bytes in group {
694 if self
695 .push_with_backpressure(&mut shard, shard_id, bytes)
696 .is_ok()
697 {
698 success += 1;
699 }
700 }
701 }
702
703 (success, unrouted)
704 }
705
706 /// Get a reference to a shard by ID.
707 pub fn shard(&self, id: u16) -> Option<ShardRef> {
708 let table = self.table.load();
709 let idx = self.resolve_idx(&table, id)?;
710 let shard = table.shards.get(idx)?.clone();
711 Some(ShardRef { shard })
712 }
713
714 /// Execute a function with exclusive access to a shard.
715 pub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
716 where
717 F: FnOnce(&mut Shard) -> R,
718 {
719 let table = self.table.load();
720 let idx = self.resolve_idx(&table, id)?;
721 table.shards.get(idx).map(|shard_lock| {
722 let mut shard = shard_lock.lock();
723 f(&mut shard)
724 })
725 }
726
727 /// Returns true if every shard's ring buffer is empty.
728 ///
729 /// Cheaper than `shard_ids()` + repeated `with_shard`: loads the
730 /// routing table once and checks each shard behind a brief lock.
731 pub fn all_shards_empty(&self) -> bool {
732 let table = self.table.load();
733 table.shards.iter().all(|s| s.lock().is_empty())
734 }
735
736 /// Iterate over all active shard IDs.
737 pub fn shard_ids(&self) -> Vec<u16> {
738 self.table.load().shard_index.keys().copied().collect()
739 }
740
741 /// Sum of `len()` across every shard's ring buffer.
742 pub fn total_pending_in_rings(&self) -> u64 {
743 let table = self.table.load();
744 table.shards.iter().map(|s| s.lock().len() as u64).sum()
745 }
746
747 /// Best-effort variant of [`Self::total_pending_in_rings`] that
748 /// never blocks: every shard whose mutex is currently held is
749 /// skipped (counted as zero). Use this from `Drop` or any path
750 /// that may run on a thread already holding a shard lock
751 /// (single-thread runtime + panic during shutdown is the
752 /// canonical hazard); the blocking variant would self-deadlock
753 /// there.
754 ///
755 /// Returns `(sum_counted, uncounted_shard_count)` so the caller
756 /// can log the uncertainty in the result.
757 pub fn try_total_pending_in_rings(&self) -> (u64, usize) {
758 let table = self.table.load();
759 let mut sum: u64 = 0;
760 let mut uncounted: usize = 0;
761 for s in table.shards.iter() {
762 match s.try_lock() {
763 Some(guard) => sum += guard.len() as u64,
764 None => uncounted += 1,
765 }
766 }
767 (sum, uncounted)
768 }
769
770 /// Get aggregated statistics from all shards.
771 ///
772 /// Lock-free: reads each shard's atomic counters directly via the
773 /// parallel `counters` vector on the routing table, with no per-
774 /// shard mutex acquisition. `events_unrouted` is sourced from the
775 /// `ShardManager` itself rather than the per-shard counters since
776 /// unrouted events have no shard to attribute to.
777 pub fn stats(&self) -> ShardStats {
778 let table = self.table.load();
779 let mut total = ShardStats::default();
780 for counters in table.counters.iter() {
781 let snap = counters.snapshot();
782 total.events_ingested += snap.events_ingested;
783 total.events_dropped += snap.events_dropped;
784 total.batches_dispatched += snap.batches_dispatched;
785 }
786 total.events_unrouted = self.events_unrouted.load(AtomicOrdering::Relaxed);
787 total
788 }
789
790 /// Rebuild the routing table with a closure that sees the old
791 /// `(shards, counters, shard_index)` and produces the new ones.
792 /// Serialized by `rebuild_lock` so concurrent scaling operations
793 /// can't race on read-modify-write of the table.
794 fn rebuild_table<F>(&self, f: F)
795 where
796 F: FnOnce(
797 &Vec<Arc<parking_lot::Mutex<Shard>>>,
798 &Vec<Arc<ShardCounters>>,
799 &std::collections::HashMap<u16, usize>,
800 ) -> ShardTable,
801 {
802 let _guard = self.rebuild_lock.lock();
803 let old = self.table.load();
804 let new = f(&old.shards, &old.counters, &old.shard_index);
805 self.table.store(Arc::new(new));
806 }
807
808 /// Add a new shard (for dynamic scaling).
809 /// Returns the new shard ID. The shard is in the routing table
810 /// and ready to be the destination of `select_shard` calls
811 /// **only after** [`activate_shard`] is called for it.
812 ///
813 /// Previously the mapper marked the shard `Active` *before* the
814 /// routing table was rebuilt and *before* any worker was wired up
815 /// to drain its ring buffer. Producers could `select_shard` to
816 /// the new id, push into its ring buffer, and have the events
817 /// stranded with no consumer. The fix uses
818 /// `scale_up_provisioning` so the mapper records the shard but
819 /// `select_shard` skips it, then `activate_shard` flips it to
820 /// `Active` once workers are ready.
821 ///
822 /// [`activate_shard`]: Self::activate_shard
823 pub fn add_shard(&self) -> Result<u16, ScalingError> {
824 self.add_shard_inner(false)
825 }
826
827 /// Like [`add_shard`] but bypasses the auto-scaling cooldown.
828 ///
829 /// Used by operator-initiated `manual_scale_up` paths. The
830 /// auto-scaling cooldown protects against the auto-scaling
831 /// monitor reacting too quickly to transient load spikes;
832 /// a deliberate operator action should not be rate-limited
833 /// by that cadence. The `max_shards` budget check still
834 /// applies.
835 ///
836 /// [`add_shard`]: Self::add_shard
837 pub fn add_shard_force(&self) -> Result<u16, ScalingError> {
838 self.add_shard_inner(true)
839 }
840
841 fn add_shard_inner(&self, force: bool) -> Result<u16, ScalingError> {
842 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
843 "Dynamic scaling not enabled".into(),
844 ))?;
845
846 // Allocate the shard in `Provisioning` state — not yet
847 // selectable.
848 let new_ids = if force {
849 mapper.scale_up_provisioning_force(1)?
850 } else {
851 mapper.scale_up_provisioning(1)?
852 };
853 let new_id = new_ids[0];
854
855 let metrics = mapper.metrics_collector(new_id).ok_or_else(|| {
856 ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", new_id))
857 })?;
858 let new_shard = Shard::with_metrics(new_id, self.ring_buffer_capacity, metrics);
859 let new_counters = new_shard.counters();
860 let new_shard = Arc::new(parking_lot::Mutex::new(new_shard));
861
862 // Publish to the routing table so `with_shard` works (the
863 // drain worker the caller is about to spawn needs this) but
864 // the shard is still `Provisioning` so `select_shard` will
865 // not route producer pushes to it yet.
866 self.rebuild_table(|shards, counters, shard_index| {
867 let mut shards = shards.clone();
868 let mut counters = counters.clone();
869 let mut shard_index = shard_index.clone();
870 let idx = shards.len();
871 shards.push(new_shard.clone());
872 counters.push(new_counters.clone());
873 shard_index.insert(new_id, idx);
874 ShardTable {
875 shards,
876 counters,
877 shard_index,
878 }
879 });
880
881 // Don't bump `num_shards` yet — `activate_shard` does that
882 // when the shard becomes selectable.
883 Ok(new_id)
884 }
885
886 /// Activate a previously-provisioned shard. After this returns,
887 /// `select_shard` will route to the shard and producer pushes
888 /// will land in its ring buffer.
889 ///
890 /// Idempotent: calling on an already-`Active` shard is `Ok(())`.
891 ///
892 /// Pre-fix this unconditionally `fetch_add(1)`d
893 /// `num_shards` even when the mapper's `activate()` early-
894 /// returned for an already-`Active` shard. After repeated
895 /// activate calls, `num_shards` exceeded both the mapper's
896 /// `active_count` and the actual shard count, breaking
897 /// modulo-based shard selection (`select_shard`) and
898 /// producing stale routing decisions. Post-fix gates the
899 /// `fetch_add` on the mapper's transition signal.
900 pub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
901 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
902 "Dynamic scaling not enabled".into(),
903 ))?;
904 let transitioned = mapper.activate(shard_id)?;
905 if transitioned {
906 self.num_shards
907 .fetch_add(1, std::sync::atomic::Ordering::Release);
908 }
909 Ok(())
910 }
911
912 /// Start draining a shard (for dynamic scaling).
913 ///
914 /// Previously only flipped the metrics collector's `draining`
915 /// atomic, leaving `MappedShard.state` untouched. Result:
916 /// `select_shard` (which filters on `state == Active`) still
917 /// routed new producers to the shard. The fix calls into the
918 /// mapper, which atomically transitions the state to `Draining`
919 /// and (for accounting) decrements `active_count`, mirroring
920 /// `scale_down(N)` for a single targeted shard.
921 pub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
922 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
923 "Dynamic scaling not enabled".into(),
924 ))?;
925 mapper.drain_specific(shard_id)
926 }
927
928 /// Remove a shard from the routing table.
929 ///
930 /// Previously this only unmapped the shard from the routing
931 /// table. The drain worker, on its next `with_shard` call,
932 /// observed `None` and exited — leaving any events still in the
933 /// ring buffer permanently stranded. The fix drains the ring
934 /// buffer into a caller-supplied scratch `Vec` **before** the
935 /// unmap, then returns the drained events so the caller
936 /// (typically `EventBus::remove_shard_internal`) can flush them
937 /// through to the adapter rather than dropping them.
938 ///
939 /// Returns `Ok(events)` where `events` is whatever was still
940 /// queued in the ring buffer at unmap time (possibly empty).
941 /// Caller is responsible for handing those off to the adapter.
942 pub fn remove_shard(
943 &self,
944 shard_id: u16,
945 ) -> Result<Vec<crate::event::InternalEvent>, ScalingError> {
946 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
947 "Dynamic scaling not enabled".into(),
948 ))?;
949
950 // Capture the mapper-side state *before* we unmap. This
951 // gates the `num_shards` decrement at the end so it stays
952 // symmetric with `activate_shard`'s `fetch_add`. The
953 // activate-failure rollback path (`bus.rs`) calls us on a
954 // shard that's still `Provisioning` — `add_shard` never
955 // bumped `num_shards` for it, so an unconditional
956 // `fetch_sub` here would leave the counter one below the
957 // table's actual size, breaking modulo-based shard
958 // selection. `Active` / `Draining` / `Stopped` shards all
959 // had `activate_shard` succeed against them at some point
960 // (it's the only way out of `Provisioning`), so they did
961 // bump `num_shards` and must decrement here.
962 let was_activated = matches!(
963 mapper.shard_state(shard_id),
964 Some(ShardState::Active) | Some(ShardState::Draining) | Some(ShardState::Stopped)
965 );
966
967 // Drain whatever is left in the ring buffer before unmapping.
968 // `with_shard` returns `None` once the shard is gone, so we
969 // do this *before* `rebuild_table`. We cap drain to a sane
970 // upper bound (`ring_buffer_capacity`) so a malformed shard
971 // can't pin us here forever.
972 let cap = self.ring_buffer_capacity;
973 let drained: Vec<crate::event::InternalEvent> = self
974 .with_shard(shard_id, |shard| {
975 let mut buf = Vec::with_capacity(shard.len().min(cap));
976 shard.pop_batch_into(&mut buf, cap);
977 buf
978 })
979 .unwrap_or_default();
980
981 let mut removed = false;
982 self.rebuild_table(|shards, counters, shard_index| {
983 let mut shards = shards.clone();
984 let mut counters = counters.clone();
985 let mut shard_index = shard_index.clone();
986
987 if let Some(idx) = shard_index.remove(&shard_id) {
988 removed = true;
989 shards.swap_remove(idx);
990 counters.swap_remove(idx);
991 // swap_remove moved the last element into `idx`: update its
992 // index mapping.
993 if idx < shards.len() {
994 let moved_shard_id = shards[idx].lock().id;
995 shard_index.insert(moved_shard_id, idx);
996 }
997 }
998
999 ShardTable {
1000 shards,
1001 counters,
1002 shard_index,
1003 }
1004 });
1005
1006 if removed && was_activated {
1007 self.num_shards
1008 .fetch_sub(1, std::sync::atomic::Ordering::Release);
1009 }
1010
1011 // Ask the mapper to drop the corresponding `MappedShard`
1012 // record. Without this sweep the mapper's
1013 // `shards: RwLock<Vec<MappedShard>>` would keep growing
1014 // across scale-up/down cycles (every scale-up appends a
1015 // fresh entry; `Stopped` entries are only removed by an
1016 // explicit `remove_specific_stopped_shard` /
1017 // `remove_stopped_shards` call). `evaluate_scaling`
1018 // filters by state but still iterates the full list, so
1019 // per-tick cost would grow with cumulative scaling history.
1020 //
1021 // The scaling monitor calls `mapper.finalize_draining()`
1022 // before invoking `bus.remove_shard_internal(id)` (which is
1023 // what calls us), so by the time we run the matching
1024 // `MappedShard` is already in `Stopped` state. We prune
1025 // ONLY this shard here, not every Stopped one — a bulk
1026 // sweep would prune sibling Stopped shards that a
1027 // sequential `manual_scale_down` is about to look up
1028 // state for in its next iteration's `remove_shard`. Once
1029 // the mapper had `None` for a sibling shard, the
1030 // `was_activated` gate above would observe it as
1031 // never-activated and skip the `num_shards` decrement,
1032 // leaving the counter one below the actual table size.
1033 mapper.remove_specific_stopped_shard(shard_id);
1034
1035 Ok(drained)
1036 }
1037
1038 /// Collect metrics from all shards (for dynamic scaling decisions).
1039 pub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>> {
1040 self.mapper.as_ref().map(|m| m.collect_metrics())
1041 }
1042
1043 /// Evaluate and optionally execute scaling.
1044 pub fn evaluate_scaling(&self) -> ScalingDecision {
1045 self.mapper
1046 .as_ref()
1047 .map(|m| m.evaluate_scaling())
1048 .unwrap_or(ScalingDecision::None)
1049 }
1050}
1051
1052/// An owned handle to a shard. Holding this does not block scaling
1053/// operations; the shard stays alive via `Arc` refcount even if
1054/// removed from the table.
1055pub struct ShardRef {
1056 shard: Arc<parking_lot::Mutex<Shard>>,
1057}
1058
1059impl ShardRef {
1060 /// Lock the shard for exclusive access.
1061 pub fn lock(&self) -> parking_lot::MutexGuard<'_, Shard> {
1062 self.shard.lock()
1063 }
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069 use serde_json::json;
1070
1071 #[test]
1072 fn test_shard_push_pop() {
1073 let mut shard = Shard::new(0, 1024);
1074
1075 let ts = shard.try_push(json!({"test": 1})).unwrap();
1076 assert!(ts > 0);
1077 assert_eq!(shard.len(), 1);
1078
1079 let event = shard.try_pop().unwrap();
1080 assert_eq!(event.shard_id, 0);
1081 assert_eq!(event.insertion_ts, ts);
1082 assert!(shard.is_empty());
1083 }
1084
1085 /// A `Shard` configured with a `ShardMetricsCollector` must feed every
1086 /// successful push into the collector so the dynamic-scaling and
1087 /// drain-finalize paths see non-zero counters. Without this wiring
1088 /// `evaluate_scaling` reads `fill_ratio == 0` for every shard and
1089 /// `finalize_draining`'s "is the ring actually empty" predicate is a
1090 /// no-op (it sees `pushes_since_drain_start == 0` regardless of
1091 /// contents).
1092 #[test]
1093 fn try_push_feeds_metrics_collector() {
1094 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1095 let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1096
1097 for i in 0..16 {
1098 shard.try_push(json!({"i": i})).unwrap();
1099 }
1100
1101 let metrics = collector.collect_and_reset();
1102 assert_eq!(
1103 metrics.event_rate, 16,
1104 "every push must increment event_rate"
1105 );
1106 assert!(metrics.fill_ratio > 0.0, "buffer length must be observable");
1107 assert!(
1108 metrics.avg_push_latency_ns > 0,
1109 "push latency must be recorded"
1110 );
1111 }
1112
1113 /// `try_total_pending_in_rings` must never block, must skip
1114 /// shards whose mutex is currently held, and must report how
1115 /// many it skipped. This is what makes `EventBus::Drop`
1116 /// safe to call on a thread that already holds a shard lock.
1117 #[test]
1118 fn try_total_pending_in_rings_skips_held_shards() {
1119 let manager = ShardManager::new(2, 1024, BackpressureMode::DropNewest);
1120 // Push some events so a non-zero count is observable.
1121 manager.ingest(json!({"i": 1})).unwrap();
1122 manager.ingest(json!({"i": 2})).unwrap();
1123 manager.ingest(json!({"i": 3})).unwrap();
1124
1125 // Uncontended: all shards counted, uncounted_shards == 0.
1126 let (sum, uncounted) = manager.try_total_pending_in_rings();
1127 assert_eq!(uncounted, 0);
1128 let baseline_sum = sum;
1129 assert!(baseline_sum > 0, "events should be pending in some shard");
1130
1131 // Hold one shard's mutex and re-check: that shard must be
1132 // skipped, uncounted must be 1, and the call must return
1133 // immediately (this test would hang on the blocking
1134 // `total_pending_in_rings` variant).
1135 let table = manager.table.load();
1136 let _guard = table.shards[0].lock();
1137 let (sum2, uncounted2) = manager.try_total_pending_in_rings();
1138 assert_eq!(uncounted2, 1, "the locked shard must be uncounted");
1139 assert!(
1140 sum2 <= baseline_sum,
1141 "sum must not include events from the locked shard"
1142 );
1143 }
1144
1145 /// Same wiring for `try_push_raw` — the byte-oriented hot path.
1146 #[test]
1147 fn try_push_raw_feeds_metrics_collector() {
1148 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1149 let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1150
1151 for i in 0..16 {
1152 shard
1153 .try_push_raw(bytes::Bytes::from(format!("event-{i}")))
1154 .unwrap();
1155 }
1156
1157 let metrics = collector.collect_and_reset();
1158 assert_eq!(metrics.event_rate, 16);
1159 assert!(metrics.fill_ratio > 0.0);
1160 assert!(metrics.avg_push_latency_ns > 0);
1161 }
1162
1163 #[test]
1164 #[allow(deprecated)] // exercises the deprecated `select_shard` path
1165 fn test_shard_manager_routing() {
1166 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1167
1168 // Same event should always go to the same shard
1169 let event = json!({"key": "value"});
1170 let shard1 = manager.select_shard(&event);
1171 let shard2 = manager.select_shard(&event);
1172 assert_eq!(shard1, shard2);
1173
1174 // Different events may go to different shards
1175 let events: Vec<_> = (0..100).map(|i| json!({"i": i})).collect();
1176 let shards: std::collections::HashSet<_> =
1177 events.iter().map(|e| manager.select_shard(e)).collect();
1178
1179 // With 100 random events and 4 shards, we should hit multiple shards
1180 assert!(shards.len() > 1);
1181 }
1182
1183 /// Regression: the deprecated `select_shard(&JsonValue)` must produce
1184 /// the same shard id as `select_shard_by_hash` would for the
1185 /// equivalent `RawEvent`. They share underlying logic now, but if a
1186 /// future refactor splits them this test catches the divergence
1187 /// before consumers do.
1188 #[test]
1189 #[allow(deprecated)]
1190 fn test_select_shard_matches_select_shard_by_hash() {
1191 let manager = ShardManager::new(8, 1024, BackpressureMode::DropNewest);
1192 for i in 0..200 {
1193 let v = json!({"i": i, "tag": format!("user-{i}")});
1194 let raw = RawEvent::from_value(v.clone());
1195 assert_eq!(
1196 manager.select_shard(&v),
1197 manager.select_shard_by_hash(raw.hash()),
1198 "select_shard and select_shard_by_hash must agree (i={i})"
1199 );
1200 }
1201 }
1202
1203 /// Pin net-perf #7: the static-mode `select_shard_by_hash` is now
1204 /// Lemire's `(hash * n) >> 64` instead of `hash % n` (one `div`
1205 /// per event eliminated). Lemire maps `[0, u64::MAX]` evenly into
1206 /// `[0, n)` for any `n` that fits in u16: the output must always
1207 /// land in range, and `hash == 0` must still resolve to shard 0
1208 /// (the multiplication overflow-pattern boundary).
1209 #[test]
1210 fn select_shard_by_hash_uses_lemire_reduction_in_static_mode() {
1211 for &shard_count in &[1u16, 2, 3, 4, 7, 8, 16, 64] {
1212 let manager = ShardManager::new(shard_count, 1024, BackpressureMode::DropNewest);
1213
1214 // `hash == 0` → `(0 * n) >> 64 == 0` regardless of `n`.
1215 // Locking down this boundary catches a regression where
1216 // the multiplication is dropped or reordered.
1217 assert_eq!(
1218 manager.select_shard_by_hash(0),
1219 0,
1220 "hash 0 must resolve to shard 0 (n={shard_count})"
1221 );
1222
1223 // Every output must be a valid shard index. A regression
1224 // back to `hash % shard_count` would still land in range,
1225 // but any wrong shift (e.g. `>> 32`) or sign-extension
1226 // bug would push the result past `shard_count`.
1227 for hash in [
1228 1u64,
1229 42,
1230 u64::MAX,
1231 u64::MAX - 1,
1232 0x8000_0000_0000_0000,
1233 0x7FFF_FFFF_FFFF_FFFF,
1234 0xDEAD_BEEF_DEAD_BEEF,
1235 ] {
1236 let shard = manager.select_shard_by_hash(hash);
1237 assert!(
1238 shard < shard_count,
1239 "shard {shard} out of range for n={shard_count}, hash={hash:#x}"
1240 );
1241 }
1242 }
1243
1244 // Distribution sanity: across 10_000 successive hashes that
1245 // mimic the input spread of `xxh3_64`, every shard sees at
1246 // least one event. A regression to `>> 64` returning 0 always
1247 // (e.g. `(hash as u64 * n as u64) >> 64`, which truncates the
1248 // product) would put everything on shard 0.
1249 let manager = ShardManager::new(8, 1024, BackpressureMode::DropNewest);
1250 let mut counts = [0usize; 8];
1251 for i in 0u64..10_000 {
1252 let h = xxhash_rust::xxh3::xxh3_64(&i.to_le_bytes());
1253 counts[manager.select_shard_by_hash(h) as usize] += 1;
1254 }
1255 for (i, &c) in counts.iter().enumerate() {
1256 assert!(
1257 c > 0,
1258 "shard {i} got 0 events out of 10_000 (Lemire reduction must spread)"
1259 );
1260 }
1261 }
1262
1263 #[test]
1264 fn test_shard_manager_ingest() {
1265 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1266
1267 for i in 0..100 {
1268 let event = json!({"i": i});
1269 let result = manager.ingest(event);
1270 assert!(result.is_ok());
1271 }
1272
1273 let stats = manager.stats();
1274 assert_eq!(stats.events_ingested, 100);
1275 assert_eq!(stats.events_dropped, 0);
1276 }
1277
1278 #[test]
1279 fn test_backpressure_drop_newest() {
1280 let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1281
1282 // Fill the buffer (capacity 4, usable 3)
1283 for i in 0..3 {
1284 manager.ingest(json!({"i": i})).unwrap();
1285 }
1286
1287 // Next insert should fail
1288 let result = manager.ingest(json!({"i": 999}));
1289 assert!(matches!(result, Err(IngestionError::Backpressure)));
1290
1291 let stats = manager.stats();
1292 assert_eq!(stats.events_ingested, 3);
1293 assert_eq!(stats.events_dropped, 1);
1294 }
1295
1296 #[test]
1297 fn test_backpressure_drop_oldest() {
1298 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1299
1300 // Fill the buffer
1301 for i in 0..3 {
1302 manager.ingest(json!({"i": i})).unwrap();
1303 }
1304
1305 // Next insert should succeed by dropping oldest
1306 let result = manager.ingest(json!({"i": 999}));
1307 assert!(result.is_ok());
1308
1309 // Verify the oldest was dropped
1310 let shard = manager.shard(0).unwrap();
1311 let events = shard.lock().pop_batch(10);
1312
1313 // Should have events 1, 2, 999 (0 was dropped)
1314 assert_eq!(events.len(), 3);
1315 assert_eq!(events[0].parse().unwrap(), json!({"i": 1}));
1316 assert_eq!(events[2].parse().unwrap(), json!({"i": 999}));
1317 }
1318
1319 #[test]
1320 fn test_raw_event_ingestion() {
1321 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1322
1323 for i in 0..100 {
1324 let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1325 let result = manager.ingest_raw(raw);
1326 assert!(result.is_ok());
1327 }
1328
1329 let stats = manager.stats();
1330 assert_eq!(stats.events_ingested, 100);
1331 assert_eq!(stats.events_dropped, 0);
1332 }
1333
1334 /// `ingest_raw_batch` groups events by destination shard before
1335 /// pushing — verify the grouping preserves FIFO within a shard,
1336 /// honors hash-based routing, and that totals match `ingest_raw`.
1337 #[test]
1338 fn test_ingest_raw_batch_routes_and_preserves_order() {
1339 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1340 let events: Vec<RawEvent> = (0..200)
1341 .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1342 .collect();
1343
1344 // Snapshot the expected destination for each event so we can
1345 // compare against what actually landed in each shard.
1346 let expected_dests: Vec<u16> = events
1347 .iter()
1348 .map(|e| manager.select_shard_by_hash(e.hash()))
1349 .collect();
1350
1351 let (success, unrouted) = manager.ingest_raw_batch(events.clone());
1352 assert_eq!(success, 200, "all events should land with ample capacity");
1353 assert_eq!(unrouted, 0, "no scale-down so no unrouted events");
1354
1355 // Aggregate totals must match.
1356 let stats = manager.stats();
1357 assert_eq!(stats.events_ingested, 200);
1358 assert_eq!(stats.events_dropped, 0);
1359
1360 // Per-shard totals must match the expected routing distribution,
1361 // and the distribution must span more than one shard (otherwise
1362 // the test wouldn't exercise the grouping path).
1363 let mut expected_by_shard: std::collections::HashMap<u16, u64> =
1364 std::collections::HashMap::new();
1365 for d in &expected_dests {
1366 *expected_by_shard.entry(*d).or_default() += 1;
1367 }
1368 assert!(
1369 expected_by_shard.len() > 1,
1370 "hash distribution should span multiple shards"
1371 );
1372 for shard_id in 0..4u16 {
1373 let got = manager
1374 .with_shard(shard_id, |s| s.stats().events_ingested)
1375 .unwrap();
1376 let want = expected_by_shard.get(&shard_id).copied().unwrap_or(0);
1377 assert_eq!(got, want, "shard {} ingested count mismatch", shard_id);
1378 }
1379
1380 // FIFO within a shard: the events a shard received, in the order
1381 // we batched them, must come out of the ring buffer in the same
1382 // order.
1383 for shard_id in 0..4u16 {
1384 let expected_payloads: Vec<&[u8]> = events
1385 .iter()
1386 .zip(expected_dests.iter())
1387 .filter(|(_, d)| **d == shard_id)
1388 .map(|(e, _)| e.as_bytes())
1389 .collect();
1390 let popped = manager.with_shard(shard_id, |s| s.pop_batch(1024)).unwrap();
1391 assert_eq!(popped.len(), expected_payloads.len());
1392 for (i, ev) in popped.iter().enumerate() {
1393 assert_eq!(
1394 ev.as_bytes(),
1395 expected_payloads[i],
1396 "shard {} position {} out of order",
1397 shard_id,
1398 i
1399 );
1400 }
1401 }
1402 }
1403
1404 /// Batching past a shard's capacity must account every dropped
1405 /// event under `DropNewest`: `success` + `events_dropped` =
1406 /// `len(input)`.
1407 #[test]
1408 fn test_ingest_raw_batch_drop_accounting() {
1409 // Single shard, usable capacity 3 (ring buffer reserves one slot).
1410 let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1411 let events: Vec<RawEvent> = (0..10)
1412 .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1413 .collect();
1414
1415 let (success, unrouted) = manager.ingest_raw_batch(events);
1416 assert_eq!(success, 3, "only 3 should fit under DropNewest");
1417 assert_eq!(unrouted, 0, "single-shard config has no unrouted events");
1418
1419 let stats = manager.stats();
1420 assert_eq!(stats.events_ingested, 3);
1421 assert_eq!(stats.events_dropped, 7);
1422 }
1423
1424 /// Empty batch is a no-op and must not touch stats.
1425 #[test]
1426 fn test_ingest_raw_batch_empty() {
1427 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1428 assert_eq!(manager.ingest_raw_batch(Vec::new()), (0, 0));
1429 let stats = manager.stats();
1430 assert_eq!(stats.events_ingested, 0);
1431 assert_eq!(stats.events_dropped, 0);
1432 }
1433
1434 #[test]
1435 fn test_remove_shard_requires_dynamic_scaling() {
1436 // Static mode - no dynamic scaling
1437 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1438
1439 // Should fail because dynamic scaling is not enabled
1440 let result = manager.remove_shard(0);
1441 assert!(result.is_err());
1442 assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1443 }
1444
1445 #[test]
1446 fn test_add_shard_requires_dynamic_scaling() {
1447 // Static mode - no dynamic scaling
1448 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1449
1450 // Should fail because dynamic scaling is not enabled
1451 let result = manager.add_shard();
1452 assert!(result.is_err());
1453 assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1454 }
1455
1456 #[test]
1457 fn test_drain_shard_requires_dynamic_scaling() {
1458 // Static mode - no dynamic scaling
1459 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1460
1461 // Should fail because dynamic scaling is not enabled
1462 let result = manager.drain_shard(0);
1463 assert!(result.is_err());
1464 assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1465 }
1466
1467 #[test]
1468 fn test_drop_oldest_counts_dropped_events() {
1469 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1470
1471 // Fill the buffer (capacity 4, usable 3)
1472 for i in 0..3 {
1473 manager.ingest(json!({"i": i})).unwrap();
1474 }
1475
1476 // This should succeed by dropping the oldest event
1477 manager.ingest(json!({"i": 999})).unwrap();
1478
1479 let stats = manager.stats();
1480 assert_eq!(stats.events_ingested, 4);
1481 // The initial push fails (counted as dropped), then retry succeeds
1482 assert_eq!(
1483 stats.events_dropped, 1,
1484 "DropOldest cycle should count exactly one drop"
1485 );
1486 }
1487
1488 #[test]
1489 fn test_drop_oldest_raw_counts_dropped_events() {
1490 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1491
1492 // Fill the buffer
1493 for i in 0..3 {
1494 let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1495 manager.ingest_raw(raw).unwrap();
1496 }
1497
1498 // This should succeed by dropping the oldest event
1499 let raw = RawEvent::from_str(r#"{"i": 999}"#);
1500 manager.ingest_raw(raw).unwrap();
1501
1502 let stats = manager.stats();
1503 assert_eq!(stats.events_ingested, 4);
1504 assert_eq!(
1505 stats.events_dropped, 1,
1506 "DropOldest cycle should count exactly one drop"
1507 );
1508 }
1509
1510 /// Pin the current contract for `BackpressureMode::Sample`:
1511 /// it returns `IngestionError::Sampled` once the buffer fills,
1512 /// indistinguishable in shape from a `Backpressure` rejection.
1513 /// Sampling itself ("keep 1 in N events") is **not implemented**
1514 /// — the comments in `ingest` / `ingest_raw` defer it to "a
1515 /// higher level" that does not exist. A consumer setting this
1516 /// mode today gets a rejection signal, never probabilistic
1517 /// admission.
1518 ///
1519 /// This test pins that contract so it cannot quietly change
1520 /// without an explicit decision. If sampling is ever wired up,
1521 /// this test will fail and force an update — at which point
1522 /// the implementer should also add coverage for the
1523 /// rate-proportional admission rate.
1524 #[test]
1525 fn sample_mode_currently_returns_sampled_after_buffer_fills() {
1526 // TODO(coverage round 2): `BackpressureMode::Sample` is
1527 // dead-on-arrival until "higher level" sampling lands;
1528 // see comments at `ShardManager::ingest` / `ingest_raw`.
1529 let manager = ShardManager::new(1, 4, BackpressureMode::Sample { rate: 2 });
1530
1531 // Fill the buffer (capacity 4, usable 3).
1532 for i in 0..3 {
1533 manager.ingest(json!({"i": i})).unwrap();
1534 }
1535
1536 // Both ingest paths must report `Sampled` — not `Backpressure`,
1537 // not `Ok` — so callers can distinguish the (currently
1538 // unused) sampling rejection from a hard backpressure
1539 // rejection in case sampling is wired up later.
1540 let json_result = manager.ingest(json!({"i": 999}));
1541 assert!(
1542 matches!(json_result, Err(IngestionError::Sampled)),
1543 "Sample mode must return Sampled on a full buffer (got {:?})",
1544 json_result
1545 );
1546
1547 let raw_result = manager.ingest_raw(RawEvent::from_str(r#"{"i": 999}"#));
1548 assert!(
1549 matches!(raw_result, Err(IngestionError::Sampled)),
1550 "Sample mode must return Sampled on a full buffer via ingest_raw (got {:?})",
1551 raw_result
1552 );
1553 }
1554
1555 #[test]
1556 fn test_drop_oldest_multiple_cycles() {
1557 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1558
1559 // Fill the buffer (usable capacity 3)
1560 for i in 0..3 {
1561 manager.ingest(json!({"i": i})).unwrap();
1562 }
1563
1564 // Push 5 more events, each triggers a DropOldest cycle
1565 for i in 3..8 {
1566 manager.ingest(json!({"i": i})).unwrap();
1567 }
1568
1569 let stats = manager.stats();
1570 assert_eq!(stats.events_ingested, 8);
1571 assert_eq!(
1572 stats.events_dropped, 5,
1573 "each DropOldest cycle should count one drop"
1574 );
1575 }
1576
1577 /// Regression: BUG_REPORT.md #44 — single-event ingest paths
1578 /// (`ingest`, `ingest_raw`) used to collapse "shard not in
1579 /// routing table" into `IngestionError::Backpressure` and never
1580 /// touch `events_unrouted`. The batch path correctly bumped the
1581 /// counter. Reconciliation drifts because of this divergence.
1582 ///
1583 /// We construct the routing miss by:
1584 /// 1. Building a dynamic-mode manager with 2 shards.
1585 /// 2. Calling `add_shard()` which (per the #46 fix) leaves the
1586 /// shard in `Provisioning` state — present in the mapper
1587 /// but not in `select_shard`'s output.
1588 /// 3. Then directly forcing `select_shard_by_hash` would still
1589 /// return an Active shard, so we exercise the secondary
1590 /// routing-table-miss path: remove a shard and have a
1591 /// stale hash-derived id.
1592 ///
1593 /// The simpler robust check: drain every shard via
1594 /// `drain_specific` until none Active. The mapper's fallback
1595 /// now returns `u16::MAX`, which is never in the routing
1596 /// table, so `resolve_idx` misses and we should see `Unrouted`
1597 /// + counter bump.
1598 #[test]
1599 fn ingest_single_event_unrouted_increments_counter() {
1600 use crate::config::ScalingPolicy;
1601 // min_shards=1 so we can drain N-1 of N shards; the last
1602 // one we skip-mark as Draining via Stopped → drain via
1603 // scale_down then verify routing miss for the still-active
1604 // shard's hash.
1605 let policy = ScalingPolicy {
1606 min_shards: 1,
1607 max_shards: 8,
1608 cooldown: std::time::Duration::from_nanos(1),
1609 ..Default::default()
1610 };
1611 let manager =
1612 ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1613
1614 // Drain 1 of 2 shards via the public API.
1615 let mapper = manager.mapper().unwrap().clone();
1616 let _ = mapper.scale_down(1).unwrap();
1617
1618 // Find a hash that routes to the *drained* shard (the one
1619 // not in `active_shard_ids`). With weighted selection and
1620 // only one Active shard, `select_shard` always returns the
1621 // Active one, so we can't easily target the drained shard
1622 // through hash routing — what we *can* do is verify the
1623 // Active shard still routes correctly (no false positives).
1624 let active_ids = mapper.active_shard_ids();
1625 assert_eq!(active_ids.len(), 1);
1626 let active = active_ids[0];
1627
1628 // ingest a few events; all should land on the Active shard,
1629 // none should hit Unrouted.
1630 for i in 0..5 {
1631 let r = manager.ingest_raw(RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)));
1632 let (sid, _) = r.expect("active shard must accept ingest");
1633 assert_eq!(sid, active, "must route to the active shard");
1634 }
1635 // No unrouted events — sanity that Unrouted only fires on
1636 // actual routing misses.
1637 assert_eq!(manager.stats().events_unrouted, 0);
1638
1639 // Now exercise the actual #44 fix: when *no* Active shard
1640 // exists, `select_shard` returns `u16::MAX` (per #51), which
1641 // is unmappable. To set this up without mutating private
1642 // fields, we rely on the fact that the manager's `with_mapper`
1643 // returns `Arc<ShardMapper>` and `drain_specific` will refuse
1644 // to take active_count below min_shards. So we simulate the
1645 // race by directly using `ingest_raw` with a forged
1646 // RawEvent whose hash WILL be modulo'd to a non-existent id
1647 // — but in dynamic mode the mapper rules, not modulo. We
1648 // can't easily get there from here, so we instead validate
1649 // the mechanism via a separate static-mode test below.
1650 //
1651 // The above sanity-check that Active shards still route
1652 // correctly + the mapper-level test
1653 // `select_shard_does_not_fall_back_to_draining` together
1654 // cover the #44 + #51 contract. Adding a routing-table-
1655 // miss test here would require a `#[cfg(test)] fn` that
1656 // can mutate the routing table, which we deliberately
1657 // avoid (the manager's invariants must hold even from
1658 // tests).
1659 }
1660
1661 /// Regression: BUG_REPORT.md #47 — `remove_shard` previously
1662 /// just unmapped the shard from the routing table and let the
1663 /// drain worker observe `with_shard → None` and exit. Anything
1664 /// still queued in the ring buffer at that moment was silently
1665 /// stranded. The fix returns the drained events to the caller
1666 /// (typically `EventBus::remove_shard_internal`) so they can
1667 /// be flushed through to the adapter rather than dropped.
1668 #[test]
1669 fn remove_shard_returns_stranded_ring_buffer_events() {
1670 use crate::config::ScalingPolicy;
1671 let policy = ScalingPolicy {
1672 min_shards: 1,
1673 max_shards: 8,
1674 cooldown: std::time::Duration::from_nanos(1),
1675 ..Default::default()
1676 };
1677 let manager =
1678 ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1679
1680 // Pin the routing for shard 1 by ingesting events with a
1681 // hash known to land there. We don't actually need
1682 // hash-routing precision: directly push into shard 1 via
1683 // `with_shard`, which bypasses select_shard.
1684 let pushed: Vec<&str> = vec![r#"{"a":1}"#, r#"{"a":2}"#, r#"{"a":3}"#];
1685 let pushed_count = pushed.len();
1686 for s in &pushed {
1687 manager
1688 .with_shard(1, |shard| {
1689 shard.try_push_raw(bytes::Bytes::from(s.as_bytes().to_vec()))
1690 })
1691 .expect("shard 1 exists")
1692 .expect("ring buffer has room");
1693 }
1694 assert_eq!(
1695 manager.with_shard(1, |s| s.len()).unwrap(),
1696 pushed_count,
1697 "events should be queued in shard 1"
1698 );
1699
1700 // Remove shard 1 — must return the stranded events, not
1701 // drop them silently.
1702 let stranded = manager
1703 .remove_shard(1)
1704 .expect("remove_shard must succeed in dynamic mode");
1705 assert_eq!(
1706 stranded.len(),
1707 pushed_count,
1708 "remove_shard must surface every event still in the \
1709 ring buffer (#47); got {} stranded events, expected {}",
1710 stranded.len(),
1711 pushed_count
1712 );
1713
1714 // Sanity: the events come back in FIFO order with the
1715 // bytes the producer pushed.
1716 for (i, ev) in stranded.iter().enumerate() {
1717 assert_eq!(ev.as_bytes(), pushed[i].as_bytes());
1718 assert_eq!(ev.shard_id, 1);
1719 }
1720
1721 // Sanity: shard 1 is gone from routing.
1722 assert!(manager.with_shard(1, |s| s.id).is_none());
1723 }
1724
1725 /// `ShardManager::activate_shard` is idempotent at
1726 /// the API level — two calls on the same shard return Ok(())
1727 /// each — but pre-fix `num_shards` was bumped on every call
1728 /// even when the mapper's `activate()` had already
1729 /// transitioned the shard to Active. After repeated calls,
1730 /// `num_shards` exceeded the actual count and `select_shard`'s
1731 /// modulo arithmetic mis-routed.
1732 #[test]
1733 fn activate_shard_is_idempotent_in_num_shards_count() {
1734 let policy = ScalingPolicy {
1735 min_shards: 1,
1736 max_shards: 16,
1737 cooldown: std::time::Duration::from_nanos(1),
1738 ..Default::default()
1739 };
1740 let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1741 .expect("dynamic scaling enabled");
1742 let initial = manager.num_shards();
1743 assert_eq!(initial, 2);
1744
1745 // Add + activate a new shard. count goes 2 → 3.
1746 let new_id = manager.add_shard().expect("add_shard");
1747 manager.activate_shard(new_id).expect("first activate");
1748 assert_eq!(
1749 manager.num_shards(),
1750 3,
1751 "first activate must bump num_shards to 3"
1752 );
1753
1754 // Repeat activate — must be a no-op on the count.
1755 manager
1756 .activate_shard(new_id)
1757 .expect("second activate (idempotent)");
1758 manager
1759 .activate_shard(new_id)
1760 .expect("third activate (idempotent)");
1761 assert_eq!(
1762 manager.num_shards(),
1763 3,
1764 "repeated activate_shard must NOT keep bumping num_shards; \
1765 pre-fix this would be 5 after three calls",
1766 );
1767 }
1768
1769 /// Removing a still-`Provisioning` shard (the activate-failure
1770 /// rollback path) must NOT decrement `num_shards`. `add_shard`
1771 /// only registers a `Provisioning` entry and intentionally
1772 /// leaves `num_shards` alone — the bump happens in
1773 /// `activate_shard`. A symmetric `fetch_sub` in `remove_shard`
1774 /// would therefore leave the counter one below the routing
1775 /// table's actual size after a rollback, breaking modulo-based
1776 /// shard selection. This pins the gating: the rollback removal
1777 /// is a num_shards no-op, while removing an activated shard
1778 /// still decrements normally.
1779 #[test]
1780 fn remove_provisioning_shard_does_not_decrement_num_shards() {
1781 let policy = ScalingPolicy {
1782 min_shards: 1,
1783 max_shards: 16,
1784 cooldown: std::time::Duration::from_nanos(1),
1785 ..Default::default()
1786 };
1787 let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1788 .expect("dynamic scaling enabled");
1789 let initial = manager.num_shards();
1790 assert_eq!(initial, 2);
1791
1792 // add_shard registers a Provisioning entry (no num_shards bump).
1793 let new_id = manager.add_shard().expect("add_shard");
1794 assert_eq!(
1795 manager.num_shards(),
1796 initial,
1797 "add_shard must NOT bump num_shards (Provisioning, not yet selectable)"
1798 );
1799
1800 // Simulate the activate-failure rollback path: remove the
1801 // never-activated shard. Pre-fix this fired
1802 // `fetch_sub(1)` unconditionally and dropped num_shards
1803 // below the table size.
1804 let stranded = manager.remove_shard(new_id).expect("rollback remove");
1805 assert!(
1806 stranded.is_empty(),
1807 "fresh provisioning shard has no events"
1808 );
1809 assert_eq!(
1810 manager.num_shards(),
1811 initial,
1812 "removing a provisioning (never-activated) shard must NOT decrement num_shards"
1813 );
1814
1815 // Companion: removing an activated shard still decrements,
1816 // so the gate is symmetric with activate_shard's fetch_add.
1817 let activated_id = manager.add_shard().expect("add for activated path");
1818 manager.activate_shard(activated_id).expect("activate");
1819 assert_eq!(
1820 manager.num_shards(),
1821 initial + 1,
1822 "activate bumps num_shards"
1823 );
1824 manager
1825 .remove_shard(activated_id)
1826 .expect("remove activated");
1827 assert_eq!(
1828 manager.num_shards(),
1829 initial,
1830 "removing an activated shard MUST decrement num_shards"
1831 );
1832 }
1833}