Skip to main content

net/shard/
mod.rs

1//! Shard management for parallel event ingestion.
2//!
3//! The shard module provides:
4//! - Lock-free ring buffers for high-throughput event queuing
5//! - Per-shard timestamp generation (no cross-shard contention)
6//! - Batch assembly with adaptive sizing
7//! - Shard manager for coordinating multiple shards
8//! - Dynamic shard scaling with weighted producer routing
9
10mod batch;
11mod mapper;
12mod ring_buffer;
13
14pub use batch::{AdaptiveBatcher, BatchWorker};
15pub use mapper::{
16    ScalingDecision, ScalingError, ShardMapper, ShardMetrics, ShardMetricsCollector, ShardState,
17};
18// `RingBuffer` and `BufferFullError` are intentionally NOT re-exported.
19// External callers go through `EventBus` / `ShardManager`, which
20// uphold the SPSC contract via `Mutex<Shard>`. Exposing the raw ring
21// buffer publicly was a silent-UB footgun — anyone wrapping it in an
22// `Arc` and pushing from two threads got data corruption with no
23// compile-time signal. `BufferFullError` is not
24// re-exported here either: callers see it as `IngestionError::Backpressure`.
25pub(crate) use ring_buffer::RingBuffer;
26
27// Re-export ScalingPolicy from config for convenience
28pub use crate::config::ScalingPolicy;
29
30use bytes::Bytes;
31
32use crate::config::BackpressureMode;
33use crate::error::IngestionError;
34use crate::event::{InternalEvent, RawEvent};
35use crate::timestamp::TimestampGenerator;
36
37use serde_json::Value as JsonValue;
38use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
39use std::sync::Arc;
40use std::time::Instant;
41
42/// Atomic counters for a single shard. Kept outside `Shard` as `Arc`s
43/// so `ShardManager::stats()` can aggregate them without locking each
44/// shard's mutex.
45#[derive(Debug, Default)]
46pub struct ShardCounters {
47    /// Total events ingested into this shard.
48    pub events_ingested: AtomicU64,
49    /// Events dropped due to backpressure.
50    pub events_dropped: AtomicU64,
51    /// Batches successfully dispatched to the adapter.
52    pub batches_dispatched: AtomicU64,
53}
54
55/// Statistics for a single shard (snapshot).
56#[derive(Debug, Default, Clone, Copy)]
57pub struct ShardStats {
58    /// Total events ingested.
59    pub events_ingested: u64,
60    /// Events dropped due to backpressure.
61    pub events_dropped: u64,
62    /// Batches dispatched to adapter.
63    pub batches_dispatched: u64,
64    /// Events that arrived at `ingest_raw_batch` but had no resolvable
65    /// shard (e.g. the routing table was rebuilt mid-dispatch and the
66    /// hashed shard id is no longer present). These cannot be
67    /// attributed to a per-shard counter, so they are tracked at the
68    /// `ShardManager` level and surfaced through aggregated `stats()`.
69    pub events_unrouted: u64,
70}
71
72impl ShardCounters {
73    /// Load a consistent snapshot of the counters.
74    ///
75    /// `events_unrouted` is left at zero here — it is a manager-level
76    /// counter, not a per-shard one. `ShardManager::stats()` fills it
77    /// in after summing per-shard fields.
78    #[inline]
79    pub fn snapshot(&self) -> ShardStats {
80        ShardStats {
81            events_ingested: self.events_ingested.load(AtomicOrdering::Relaxed),
82            events_dropped: self.events_dropped.load(AtomicOrdering::Relaxed),
83            batches_dispatched: self.batches_dispatched.load(AtomicOrdering::Relaxed),
84            events_unrouted: 0,
85        }
86    }
87}
88
89/// A single shard with its own ring buffer and timestamp generator.
90pub struct Shard {
91    /// Shard identifier.
92    pub id: u16,
93    /// Ring buffer for event queuing.
94    ring_buffer: RingBuffer<InternalEvent>,
95    /// Shard-local timestamp generator (no contention).
96    timestamp_gen: TimestampGenerator,
97    /// Shared atomic counters (also referenced from `ShardTable` for
98    /// lock-free aggregation).
99    counters: Arc<ShardCounters>,
100    /// Optional metrics collector for dynamic scaling.
101    metrics_collector: Option<Arc<ShardMetricsCollector>>,
102    /// Ring buffer capacity (for metrics).
103    capacity: usize,
104}
105
106impl Shard {
107    /// Create a new shard.
108    pub fn new(id: u16, capacity: usize) -> Self {
109        Self {
110            id,
111            ring_buffer: RingBuffer::new(capacity),
112            timestamp_gen: TimestampGenerator::new(),
113            counters: Arc::new(ShardCounters::default()),
114            metrics_collector: None,
115            capacity,
116        }
117    }
118
119    /// Create a new shard with a metrics collector for dynamic scaling.
120    pub fn with_metrics(id: u16, capacity: usize, metrics: Arc<ShardMetricsCollector>) -> Self {
121        Self {
122            id,
123            ring_buffer: RingBuffer::new(capacity),
124            timestamp_gen: TimestampGenerator::new(),
125            counters: Arc::new(ShardCounters::default()),
126            metrics_collector: Some(metrics),
127            capacity,
128        }
129    }
130
131    /// Clone the atomic counter handle (for lock-free aggregation).
132    #[inline]
133    pub fn counters(&self) -> Arc<ShardCounters> {
134        self.counters.clone()
135    }
136
137    /// Set the metrics collector.
138    pub fn set_metrics_collector(&mut self, metrics: Arc<ShardMetricsCollector>) {
139        self.metrics_collector = Some(metrics);
140    }
141
142    /// Try to push a raw event (pre-serialized bytes) into the shard's ring buffer.
143    /// Returns the assigned insertion timestamp on success.
144    ///
145    /// This is the fastest ingestion path - no serialization or hashing needed.
146    #[inline]
147    pub fn try_push_raw(&mut self, raw: Bytes) -> Result<u64, IngestionError> {
148        let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
149        let ts = self.timestamp_gen.next();
150        let event = InternalEvent::new(raw, ts, self.id);
151
152        match self.ring_buffer.try_push(event) {
153            Ok(()) => {
154                self.counters
155                    .events_ingested
156                    .fetch_add(1, AtomicOrdering::Relaxed);
157                if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
158                    collector.record_push(start.elapsed().as_nanos() as u64);
159                    collector.record_buffer_len(self.ring_buffer.len());
160                }
161                Ok(ts)
162            }
163            Err(_) => {
164                self.counters
165                    .events_dropped
166                    .fetch_add(1, AtomicOrdering::Relaxed);
167                Err(IngestionError::Backpressure)
168            }
169        }
170    }
171
172    /// Try to push a JSON value into the shard's ring buffer.
173    /// Returns the assigned insertion timestamp on success.
174    ///
175    /// This serializes the value once before storing.
176    #[inline]
177    pub fn try_push(&mut self, raw: JsonValue) -> Result<u64, IngestionError> {
178        let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
179        let ts = self.timestamp_gen.next();
180        let event = InternalEvent::from_value(raw, ts, self.id);
181
182        match self.ring_buffer.try_push(event) {
183            Ok(()) => {
184                self.counters
185                    .events_ingested
186                    .fetch_add(1, AtomicOrdering::Relaxed);
187                if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
188                    collector.record_push(start.elapsed().as_nanos() as u64);
189                    collector.record_buffer_len(self.ring_buffer.len());
190                }
191                Ok(ts)
192            }
193            Err(_) => {
194                self.counters
195                    .events_dropped
196                    .fetch_add(1, AtomicOrdering::Relaxed);
197                Err(IngestionError::Backpressure)
198            }
199        }
200    }
201
202    /// Pop a batch of events from the ring buffer.
203    ///
204    /// Allocates a fresh `Vec`. Prefer [`pop_batch_into`] in drain
205    /// loops where the per-cycle `Vec` allocation should happen
206    /// outside the shard mutex.
207    ///
208    /// [`pop_batch_into`]: Self::pop_batch_into
209    #[inline]
210    pub fn pop_batch(&mut self, max: usize) -> Vec<InternalEvent> {
211        let out = self.ring_buffer.pop_batch(max);
212        if let Some(collector) = &self.metrics_collector {
213            collector.record_buffer_len(self.ring_buffer.len());
214        }
215        out
216    }
217
218    /// Pop a batch of events into a caller-owned buffer.
219    ///
220    /// Append semantics: does **not** clear `dst`; reserves
221    /// `count` slots and pushes drained elements onto the end.
222    /// Returns the number drained this call. Use this in
223    /// steady-state drain loops where the caller keeps a scratch
224    /// `Vec` across cycles, so the per-cycle allocation moves out
225    /// of the consumer's critical section.
226    #[inline]
227    pub fn pop_batch_into(&mut self, dst: &mut Vec<InternalEvent>, max: usize) -> usize {
228        let n = self.ring_buffer.pop_batch_into(dst, max);
229        if let Some(collector) = &self.metrics_collector {
230            collector.record_buffer_len(self.ring_buffer.len());
231        }
232        n
233    }
234
235    /// Try to pop a single event from the ring buffer.
236    #[inline]
237    pub fn try_pop(&mut self) -> Option<InternalEvent> {
238        let out = self.ring_buffer.try_pop();
239        if let Some(collector) = &self.metrics_collector {
240            collector.record_buffer_len(self.ring_buffer.len());
241        }
242        out
243    }
244
245    /// Producer-side eviction of the oldest event.
246    ///
247    /// Used by `BackpressureMode::DropOldest` to make room for a
248    /// new push when the buffer is full. Bypasses the ring buffer's
249    /// consumer-thread tracking (the producer thread is calling
250    /// what is normally a consumer-side operation). Safe because
251    /// the outer shard mutex serializes this against any concurrent
252    /// `try_pop` from the legitimate consumer (the batch worker).
253    #[inline]
254    pub(crate) fn evict_oldest(&mut self) -> Option<InternalEvent> {
255        self.ring_buffer.evict_oldest()
256    }
257
258    /// Get the current buffer length.
259    #[inline]
260    pub fn len(&self) -> usize {
261        self.ring_buffer.len()
262    }
263
264    /// Check if the buffer is empty.
265    #[inline]
266    pub fn is_empty(&self) -> bool {
267        self.ring_buffer.is_empty()
268    }
269
270    /// Check if the buffer is full.
271    #[inline]
272    pub fn is_full(&self) -> bool {
273        self.ring_buffer.is_full()
274    }
275
276    /// Get the fill ratio (0.0 - 1.0).
277    #[inline]
278    pub fn fill_ratio(&self) -> f64 {
279        if self.capacity == 0 {
280            0.0
281        } else {
282            self.ring_buffer.len() as f64 / self.capacity as f64
283        }
284    }
285
286    /// Get the ring buffer capacity.
287    #[inline]
288    pub fn capacity(&self) -> usize {
289        self.capacity
290    }
291
292    /// Get a snapshot of shard statistics.
293    pub fn stats(&self) -> ShardStats {
294        self.counters.snapshot()
295    }
296
297    /// Record a batch dispatch.
298    pub fn record_batch_dispatch(&self) {
299        self.counters
300            .batches_dispatched
301            .fetch_add(1, AtomicOrdering::Relaxed);
302    }
303}
304
305/// Immutable routing table: shards + index + counter handles.
306///
307/// Placed behind an `ArcSwap` on `ShardManager` so the common read
308/// path (`ingest`, `ingest_raw`, `with_shard`, `stats`) is
309/// lock-free. Rebuilt on scale up/down via RCU-style swap.
310pub struct ShardTable {
311    /// All shards, indexed by position. `Arc<Mutex<Shard>>` lets a new
312    /// table share shard handles with the previous table (cheap Arc
313    /// clones during rebuild).
314    shards: Vec<Arc<parking_lot::Mutex<Shard>>>,
315    /// Parallel vector of counter handles. Exposes stats without
316    /// locking the shard mutex.
317    counters: Vec<Arc<ShardCounters>>,
318    /// Map from shard ID to index in `shards`/`counters`.
319    shard_index: std::collections::HashMap<u16, usize>,
320}
321
322impl ShardTable {
323    fn new(shards: Vec<Shard>) -> Self {
324        let mut shard_index = std::collections::HashMap::with_capacity(shards.len());
325        let mut counters = Vec::with_capacity(shards.len());
326        let shards: Vec<_> = shards
327            .into_iter()
328            .enumerate()
329            .map(|(idx, s)| {
330                shard_index.insert(s.id, idx);
331                counters.push(s.counters());
332                Arc::new(parking_lot::Mutex::new(s))
333            })
334            .collect();
335        Self {
336            shards,
337            counters,
338            shard_index,
339        }
340    }
341}
342
343/// Manager for multiple shards.
344///
345/// The ShardManager can operate in two modes:
346/// 1. Static mode (default): Fixed number of shards, simple hash-based routing
347/// 2. Dynamic mode: Shards can be added/removed based on load, weighted routing
348pub struct ShardManager {
349    /// Routing table. Swapped atomically on scale up/down so readers
350    /// never see a partially-updated `(shards, shard_index)` pair.
351    table: arc_swap::ArcSwap<ShardTable>,
352    /// Current number of active shards.
353    num_shards: std::sync::atomic::AtomicU16,
354    /// Backpressure mode.
355    backpressure_mode: BackpressureMode,
356    /// Ring buffer capacity for new shards.
357    ring_buffer_capacity: usize,
358    /// Optional shard mapper for dynamic scaling.
359    mapper: Option<Arc<ShardMapper>>,
360    /// Serializes concurrent `add_shard` / `remove_shard` rebuilds.
361    /// Not on the ingest path.
362    rebuild_lock: parking_lot::Mutex<()>,
363    /// Events dropped because no destination shard was resolvable.
364    /// Distinct from per-shard `events_dropped` (which tracks
365    /// backpressure on a known shard) — this counts events whose
366    /// hashed shard id was missing from the routing table at lookup
367    /// time, e.g. due to a concurrent scale-down. Surfaced via
368    /// `stats().events_unrouted`.
369    events_unrouted: AtomicU64,
370}
371
372impl ShardManager {
373    /// Create a new shard manager (static mode).
374    pub fn new(
375        num_shards: u16,
376        ring_buffer_capacity: usize,
377        backpressure_mode: BackpressureMode,
378    ) -> Self {
379        let shards: Vec<Shard> = (0..num_shards)
380            .map(|id| Shard::new(id, ring_buffer_capacity))
381            .collect();
382
383        Self {
384            table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
385            num_shards: std::sync::atomic::AtomicU16::new(num_shards),
386            backpressure_mode,
387            ring_buffer_capacity,
388            mapper: None,
389            rebuild_lock: parking_lot::Mutex::new(()),
390            events_unrouted: AtomicU64::new(0),
391        }
392    }
393
394    /// Create a new shard manager with dynamic scaling enabled.
395    pub fn with_mapper(
396        num_shards: u16,
397        ring_buffer_capacity: usize,
398        backpressure_mode: BackpressureMode,
399        policy: ScalingPolicy,
400    ) -> Result<Self, ScalingError> {
401        let mapper = Arc::new(ShardMapper::new(num_shards, ring_buffer_capacity, policy)?);
402
403        let shards: Vec<Shard> = (0..num_shards)
404            .map(|id| {
405                let metrics = mapper.metrics_collector(id).ok_or_else(|| {
406                    ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", id))
407                })?;
408                Ok(Shard::with_metrics(id, ring_buffer_capacity, metrics))
409            })
410            .collect::<Result<Vec<_>, ScalingError>>()?;
411
412        Ok(Self {
413            table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
414            num_shards: std::sync::atomic::AtomicU16::new(num_shards),
415            backpressure_mode,
416            ring_buffer_capacity,
417            mapper: Some(mapper),
418            rebuild_lock: parking_lot::Mutex::new(()),
419            events_unrouted: AtomicU64::new(0),
420        })
421    }
422
423    /// Get the shard mapper (if dynamic scaling is enabled).
424    pub fn mapper(&self) -> Option<&Arc<ShardMapper>> {
425        self.mapper.as_ref()
426    }
427
428    /// Get the number of active shards.
429    #[inline]
430    pub fn num_shards(&self) -> u16 {
431        self.num_shards.load(std::sync::atomic::Ordering::Acquire)
432    }
433
434    /// Get the backpressure mode.
435    #[inline]
436    pub fn backpressure_mode(&self) -> BackpressureMode {
437        self.backpressure_mode
438    }
439
440    /// Select a shard for an event based on its content hash.
441    /// Uses weighted selection if dynamic scaling is enabled.
442    ///
443    /// **Prefer [`select_shard_by_hash`].** This method serializes the
444    /// `JsonValue` to bytes just to compute the hash; if you already
445    /// have a `RawEvent` (or any pre-computed `xxh3_64` of the
446    /// canonical bytes), pass that hash directly. The internal
447    /// ingest paths all do — this method exists for ad-hoc external
448    /// callers that haven't yet adopted the `RawEvent` pattern.
449    ///
450    /// [`select_shard_by_hash`]: Self::select_shard_by_hash
451    #[inline]
452    #[deprecated(
453        since = "0.10.0",
454        note = "serializes the value just to hash it; prefer `RawEvent::from_value(v).hash()` + `select_shard_by_hash` to avoid the duplicate serialization"
455    )]
456    pub fn select_shard(&self, event: &JsonValue) -> u16 {
457        // Use xxhash for fast, deterministic hashing. `to_vec` avoids the
458        // extra UTF-8 validation that `to_string` performs on the serialized
459        // buffer, since we only need the bytes for hashing.
460        let bytes = serde_json::to_vec(event).expect("Value serialization is infallible");
461        let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
462        self.select_shard_by_hash(hash)
463    }
464
465    /// Select a shard using a pre-computed hash.
466    ///
467    /// This is faster than `select_shard` when you already have the hash.
468    #[inline]
469    pub fn select_shard_by_hash(&self, hash: u64) -> u16 {
470        if let Some(ref mapper) = self.mapper {
471            // Dynamic mode: use weighted selection
472            mapper.select_shard(hash)
473        } else {
474            // Static mode: simple modulo. Defensive guard against
475            // `num_shards == 0` — config validation rejects 0 at
476            // startup and `scale_down` requires `current > min_shards
477            // >= 1`, so this branch is unreachable today, but a stray
478            // 0 here would otherwise panic on the `%` below.
479            let num_shards = self.num_shards.load(std::sync::atomic::Ordering::Acquire);
480            debug_assert!(num_shards > 0, "num_shards must be > 0");
481            if num_shards == 0 {
482                return 0;
483            }
484            (hash % num_shards as u64) as u16
485        }
486    }
487
488    /// Resolve a shard ID to its table index, using the fast path in
489    /// static mode (shard_id == index).
490    #[inline]
491    fn resolve_idx(&self, table: &ShardTable, shard_id: u16) -> Option<usize> {
492        if self.mapper.is_none() {
493            Some(shard_id as usize)
494        } else {
495            table.shard_index.get(&shard_id).copied()
496        }
497    }
498
499    /// Push `raw` into `shard`, handling backpressure. Only clones the
500    /// bytes when `DropOldest` needs them for the retry path.
501    #[inline]
502    fn push_with_backpressure(
503        &self,
504        shard: &mut Shard,
505        shard_id: u16,
506        raw: Bytes,
507    ) -> Result<(u16, u64), IngestionError> {
508        match self.backpressure_mode {
509            BackpressureMode::DropOldest => match shard.try_push_raw(raw.clone()) {
510                Ok(ts) => Ok((shard_id, ts)),
511                Err(IngestionError::Backpressure) => {
512                    // The failed try_push_raw incremented events_dropped for
513                    // the *new* event, but the new event isn't actually
514                    // dropped — the oldest is. Correct the stats: undo the
515                    // spurious drop count, evict the oldest (which is the real
516                    // drop), and retry with the same ref-counted bytes.
517                    //
518                    // Use the producer-side `evict_oldest` rather
519                    // than `try_pop`. Calling `try_pop` from the
520                    // producer thread would violate the SPSC consumer
521                    // contract (the
522                    // legitimate consumer is the batch worker, on a
523                    // different task / thread).
524                    //
525                    // Transient stats note: a concurrent reader of
526                    // `manager.stats().events_dropped` between the
527                    // `fetch_sub` and the second `fetch_add` would
528                    // briefly observe the pre-correction value
529                    // (one less than reality). The net delta over
530                    // the whole retry is `+1`, matching the real
531                    // drop. Documented as snapshot-not-coherent
532                    // per `ShardCounters::snapshot`'s contract.
533                    shard
534                        .counters
535                        .events_dropped
536                        .fetch_sub(1, AtomicOrdering::Relaxed);
537                    let _ = shard.evict_oldest();
538                    shard
539                        .counters
540                        .events_dropped
541                        .fetch_add(1, AtomicOrdering::Relaxed);
542                    shard.try_push_raw(raw).map(|ts| (shard_id, ts))
543                }
544                Err(e) => Err(e),
545            },
546            BackpressureMode::Sample { .. } => match shard.try_push_raw(raw) {
547                Ok(ts) => Ok((shard_id, ts)),
548                Err(IngestionError::Backpressure) => Err(IngestionError::Sampled),
549                Err(e) => Err(e),
550            },
551            BackpressureMode::DropNewest | BackpressureMode::FailProducer => {
552                shard.try_push_raw(raw).map(|ts| (shard_id, ts))
553            }
554        }
555    }
556
557    /// Ingest an event into the appropriate shard.
558    pub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError> {
559        // Serialize once upfront - avoids clone on retry
560        let raw = Bytes::from(serde_json::to_vec(&event)?);
561        let hash = xxhash_rust::xxh3::xxh3_64(&raw);
562        let shard_id = self.select_shard_by_hash(hash);
563
564        let table = self.table.load();
565        // Surface "no routable destination" as `Unrouted` (not
566        // `Backpressure`) and bump the manager-level
567        // `events_unrouted` counter so per-event vs. batch-path
568        // accounting agree. The secondary `table.shards.get(idx)`
569        // miss should be impossible by the `shard_index ↔ shards`
570        // invariant — keep returning `Unrouted` defensively rather
571        // than panicking.
572        let Some(idx) = self.resolve_idx(&table, shard_id) else {
573            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
574            return Err(IngestionError::Unrouted);
575        };
576        let Some(shard_lock) = table.shards.get(idx) else {
577            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
578            return Err(IngestionError::Unrouted);
579        };
580
581        let mut shard = shard_lock.lock();
582        self.push_with_backpressure(&mut shard, shard_id, raw)
583    }
584
585    /// Ingest a raw event (pre-serialized with cached hash).
586    ///
587    /// This is the fastest ingestion path:
588    /// - Uses pre-computed hash for shard selection (no serialization)
589    /// - Stores bytes directly (no clone needed, reference-counted)
590    #[inline]
591    pub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError> {
592        let shard_id = self.select_shard_by_hash(event.hash());
593
594        let table = self.table.load();
595        // See `ingest` above for the `Unrouted` rationale.
596        let Some(idx) = self.resolve_idx(&table, shard_id) else {
597            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
598            return Err(IngestionError::Unrouted);
599        };
600        let Some(shard_lock) = table.shards.get(idx) else {
601            self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
602            return Err(IngestionError::Unrouted);
603        };
604
605        let mut shard = shard_lock.lock();
606        self.push_with_backpressure(&mut shard, shard_id, event.bytes())
607    }
608
609    /// Ingest a batch of pre-serialized events, grouped by shard.
610    ///
611    /// Each destination shard's mutex is acquired once and all of that
612    /// shard's events are pushed before releasing. With a uniform hash
613    /// distribution this amortizes lock acquisitions from O(events) to
614    /// O(shards). Backpressure semantics match per-event `ingest_raw`.
615    ///
616    /// Returns `(success, unrouted)` where `success` is the count of
617    /// events successfully pushed onto a shard's ring buffer and
618    /// `unrouted` is the count of events whose destination shard was
619    /// not present in the routing table at the time of dispatch
620    /// (e.g. concurrent scale-down). The remainder
621    /// (`total - success - unrouted`) is the backpressure-class drop
622    /// count.
623    ///
624    /// Returns `(success, unrouted)` rather than just `success`
625    /// so the bus can subtract `unrouted` before publishing
626    /// `events_dropped`. Returning only `success` would let the
627    /// bus's `dropped = total - success` accounting double-count
628    /// unrouted events — they're already tallied on
629    /// `events_unrouted` inside this function.
630    pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize) {
631        if events.is_empty() {
632            return (0, 0);
633        }
634
635        let table = self.table.load();
636
637        // Bucket by table index. Using a Vec<Vec<_>> keyed by index is
638        // cheaper than a HashMap for the common case of a small
639        // shard count.
640        let mut groups: Vec<Vec<Bytes>> = (0..table.shards.len()).map(|_| Vec::new()).collect();
641        let mut group_ids: Vec<u16> = vec![0; groups.len()];
642
643        let mut unrouted = 0usize;
644        for event in events {
645            let shard_id = self.select_shard_by_hash(event.hash());
646            let Some(idx) = self.resolve_idx(&table, shard_id) else {
647                // Routing table doesn't contain the chosen shard
648                // (e.g. concurrent scale-down removed it). The drop
649                // can't be attributed to a per-shard counter; track
650                // it on the manager-level `events_unrouted` so
651                // bus-level vs. per-shard reconciliation is exact.
652                unrouted += 1;
653                continue;
654            };
655            if let Some(g) = groups.get_mut(idx) {
656                if g.is_empty() {
657                    group_ids[idx] = shard_id;
658                }
659                g.push(event.bytes());
660            }
661        }
662        if unrouted > 0 {
663            self.events_unrouted
664                .fetch_add(unrouted as u64, AtomicOrdering::Relaxed);
665        }
666
667        let mut success = 0usize;
668        for (idx, group) in groups.into_iter().enumerate() {
669            if group.is_empty() {
670                continue;
671            }
672            let shard_id = group_ids[idx];
673            let Some(shard_lock) = table.shards.get(idx) else {
674                continue;
675            };
676            let mut shard = shard_lock.lock();
677            for bytes in group {
678                if self
679                    .push_with_backpressure(&mut shard, shard_id, bytes)
680                    .is_ok()
681                {
682                    success += 1;
683                }
684            }
685        }
686
687        (success, unrouted)
688    }
689
690    /// Get a reference to a shard by ID.
691    pub fn shard(&self, id: u16) -> Option<ShardRef> {
692        let table = self.table.load();
693        let idx = self.resolve_idx(&table, id)?;
694        let shard = table.shards.get(idx)?.clone();
695        Some(ShardRef { shard })
696    }
697
698    /// Execute a function with exclusive access to a shard.
699    pub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
700    where
701        F: FnOnce(&mut Shard) -> R,
702    {
703        let table = self.table.load();
704        let idx = self.resolve_idx(&table, id)?;
705        table.shards.get(idx).map(|shard_lock| {
706            let mut shard = shard_lock.lock();
707            f(&mut shard)
708        })
709    }
710
711    /// Returns true if every shard's ring buffer is empty.
712    ///
713    /// Cheaper than `shard_ids()` + repeated `with_shard`: loads the
714    /// routing table once and checks each shard behind a brief lock.
715    pub fn all_shards_empty(&self) -> bool {
716        let table = self.table.load();
717        table.shards.iter().all(|s| s.lock().is_empty())
718    }
719
720    /// Iterate over all active shard IDs.
721    pub fn shard_ids(&self) -> Vec<u16> {
722        self.table.load().shard_index.keys().copied().collect()
723    }
724
725    /// Sum of `len()` across every shard's ring buffer.
726    pub fn total_pending_in_rings(&self) -> u64 {
727        let table = self.table.load();
728        table.shards.iter().map(|s| s.lock().len() as u64).sum()
729    }
730
731    /// Best-effort variant of [`Self::total_pending_in_rings`] that
732    /// never blocks: every shard whose mutex is currently held is
733    /// skipped (counted as zero). Use this from `Drop` or any path
734    /// that may run on a thread already holding a shard lock
735    /// (single-thread runtime + panic during shutdown is the
736    /// canonical hazard); the blocking variant would self-deadlock
737    /// there.
738    ///
739    /// Returns `(sum_counted, uncounted_shard_count)` so the caller
740    /// can log the uncertainty in the result.
741    pub fn try_total_pending_in_rings(&self) -> (u64, usize) {
742        let table = self.table.load();
743        let mut sum: u64 = 0;
744        let mut uncounted: usize = 0;
745        for s in table.shards.iter() {
746            match s.try_lock() {
747                Some(guard) => sum += guard.len() as u64,
748                None => uncounted += 1,
749            }
750        }
751        (sum, uncounted)
752    }
753
754    /// Get aggregated statistics from all shards.
755    ///
756    /// Lock-free: reads each shard's atomic counters directly via the
757    /// parallel `counters` vector on the routing table, with no per-
758    /// shard mutex acquisition. `events_unrouted` is sourced from the
759    /// `ShardManager` itself rather than the per-shard counters since
760    /// unrouted events have no shard to attribute to.
761    pub fn stats(&self) -> ShardStats {
762        let table = self.table.load();
763        let mut total = ShardStats::default();
764        for counters in table.counters.iter() {
765            let snap = counters.snapshot();
766            total.events_ingested += snap.events_ingested;
767            total.events_dropped += snap.events_dropped;
768            total.batches_dispatched += snap.batches_dispatched;
769        }
770        total.events_unrouted = self.events_unrouted.load(AtomicOrdering::Relaxed);
771        total
772    }
773
774    /// Rebuild the routing table with a closure that sees the old
775    /// `(shards, counters, shard_index)` and produces the new ones.
776    /// Serialized by `rebuild_lock` so concurrent scaling operations
777    /// can't race on read-modify-write of the table.
778    fn rebuild_table<F>(&self, f: F)
779    where
780        F: FnOnce(
781            &Vec<Arc<parking_lot::Mutex<Shard>>>,
782            &Vec<Arc<ShardCounters>>,
783            &std::collections::HashMap<u16, usize>,
784        ) -> ShardTable,
785    {
786        let _guard = self.rebuild_lock.lock();
787        let old = self.table.load();
788        let new = f(&old.shards, &old.counters, &old.shard_index);
789        self.table.store(Arc::new(new));
790    }
791
792    /// Add a new shard (for dynamic scaling).
793    /// Returns the new shard ID. The shard is in the routing table
794    /// and ready to be the destination of `select_shard` calls
795    /// **only after** [`activate_shard`] is called for it.
796    ///
797    /// Previously the mapper marked the shard `Active` *before* the
798    /// routing table was rebuilt and *before* any worker was wired up
799    /// to drain its ring buffer. Producers could `select_shard` to
800    /// the new id, push into its ring buffer, and have the events
801    /// stranded with no consumer. The fix uses
802    /// `scale_up_provisioning` so the mapper records the shard but
803    /// `select_shard` skips it, then `activate_shard` flips it to
804    /// `Active` once workers are ready.
805    ///
806    /// [`activate_shard`]: Self::activate_shard
807    pub fn add_shard(&self) -> Result<u16, ScalingError> {
808        self.add_shard_inner(false)
809    }
810
811    /// Like [`add_shard`] but bypasses the auto-scaling cooldown.
812    ///
813    /// Used by operator-initiated `manual_scale_up` paths. The
814    /// auto-scaling cooldown protects against the auto-scaling
815    /// monitor reacting too quickly to transient load spikes;
816    /// a deliberate operator action should not be rate-limited
817    /// by that cadence. The `max_shards` budget check still
818    /// applies.
819    ///
820    /// [`add_shard`]: Self::add_shard
821    pub fn add_shard_force(&self) -> Result<u16, ScalingError> {
822        self.add_shard_inner(true)
823    }
824
825    fn add_shard_inner(&self, force: bool) -> Result<u16, ScalingError> {
826        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
827            "Dynamic scaling not enabled".into(),
828        ))?;
829
830        // Allocate the shard in `Provisioning` state — not yet
831        // selectable.
832        let new_ids = if force {
833            mapper.scale_up_provisioning_force(1)?
834        } else {
835            mapper.scale_up_provisioning(1)?
836        };
837        let new_id = new_ids[0];
838
839        let metrics = mapper.metrics_collector(new_id).ok_or_else(|| {
840            ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", new_id))
841        })?;
842        let new_shard = Shard::with_metrics(new_id, self.ring_buffer_capacity, metrics);
843        let new_counters = new_shard.counters();
844        let new_shard = Arc::new(parking_lot::Mutex::new(new_shard));
845
846        // Publish to the routing table so `with_shard` works (the
847        // drain worker the caller is about to spawn needs this) but
848        // the shard is still `Provisioning` so `select_shard` will
849        // not route producer pushes to it yet.
850        self.rebuild_table(|shards, counters, shard_index| {
851            let mut shards = shards.clone();
852            let mut counters = counters.clone();
853            let mut shard_index = shard_index.clone();
854            let idx = shards.len();
855            shards.push(new_shard.clone());
856            counters.push(new_counters.clone());
857            shard_index.insert(new_id, idx);
858            ShardTable {
859                shards,
860                counters,
861                shard_index,
862            }
863        });
864
865        // Don't bump `num_shards` yet — `activate_shard` does that
866        // when the shard becomes selectable.
867        Ok(new_id)
868    }
869
870    /// Activate a previously-provisioned shard. After this returns,
871    /// `select_shard` will route to the shard and producer pushes
872    /// will land in its ring buffer.
873    ///
874    /// Idempotent: calling on an already-`Active` shard is `Ok(())`.
875    ///
876    /// Pre-fix this unconditionally `fetch_add(1)`d
877    /// `num_shards` even when the mapper's `activate()` early-
878    /// returned for an already-`Active` shard. After repeated
879    /// activate calls, `num_shards` exceeded both the mapper's
880    /// `active_count` and the actual shard count, breaking
881    /// modulo-based shard selection (`select_shard`) and
882    /// producing stale routing decisions.  Post-fix gates the
883    /// `fetch_add` on the mapper's transition signal.
884    pub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
885        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
886            "Dynamic scaling not enabled".into(),
887        ))?;
888        let transitioned = mapper.activate(shard_id)?;
889        if transitioned {
890            self.num_shards
891                .fetch_add(1, std::sync::atomic::Ordering::Release);
892        }
893        Ok(())
894    }
895
896    /// Start draining a shard (for dynamic scaling).
897    ///
898    /// Previously only flipped the metrics collector's `draining`
899    /// atomic, leaving `MappedShard.state` untouched. Result:
900    /// `select_shard` (which filters on `state == Active`) still
901    /// routed new producers to the shard. The fix calls into the
902    /// mapper, which atomically transitions the state to `Draining`
903    /// and (for accounting) decrements `active_count`, mirroring
904    /// `scale_down(N)` for a single targeted shard.
905    pub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
906        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
907            "Dynamic scaling not enabled".into(),
908        ))?;
909        mapper.drain_specific(shard_id)
910    }
911
912    /// Remove a shard from the routing table.
913    ///
914    /// Previously this only unmapped the shard from the routing
915    /// table. The drain worker, on its next `with_shard` call,
916    /// observed `None` and exited — leaving any events still in the
917    /// ring buffer permanently stranded. The fix drains the ring
918    /// buffer into a caller-supplied scratch `Vec` **before** the
919    /// unmap, then returns the drained events so the caller
920    /// (typically `EventBus::remove_shard_internal`) can flush them
921    /// through to the adapter rather than dropping them.
922    ///
923    /// Returns `Ok(events)` where `events` is whatever was still
924    /// queued in the ring buffer at unmap time (possibly empty).
925    /// Caller is responsible for handing those off to the adapter.
926    pub fn remove_shard(
927        &self,
928        shard_id: u16,
929    ) -> Result<Vec<crate::event::InternalEvent>, ScalingError> {
930        let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
931            "Dynamic scaling not enabled".into(),
932        ))?;
933
934        // Capture the mapper-side state *before* we unmap. This
935        // gates the `num_shards` decrement at the end so it stays
936        // symmetric with `activate_shard`'s `fetch_add`. The
937        // activate-failure rollback path (`bus.rs`) calls us on a
938        // shard that's still `Provisioning` — `add_shard` never
939        // bumped `num_shards` for it, so an unconditional
940        // `fetch_sub` here would leave the counter one below the
941        // table's actual size, breaking modulo-based shard
942        // selection. `Active` / `Draining` / `Stopped` shards all
943        // had `activate_shard` succeed against them at some point
944        // (it's the only way out of `Provisioning`), so they did
945        // bump `num_shards` and must decrement here.
946        let was_activated = matches!(
947            mapper.shard_state(shard_id),
948            Some(ShardState::Active) | Some(ShardState::Draining) | Some(ShardState::Stopped)
949        );
950
951        // Drain whatever is left in the ring buffer before unmapping.
952        // `with_shard` returns `None` once the shard is gone, so we
953        // do this *before* `rebuild_table`. We cap drain to a sane
954        // upper bound (`ring_buffer_capacity`) so a malformed shard
955        // can't pin us here forever.
956        let cap = self.ring_buffer_capacity;
957        let drained: Vec<crate::event::InternalEvent> = self
958            .with_shard(shard_id, |shard| {
959                let mut buf = Vec::with_capacity(shard.len().min(cap));
960                shard.pop_batch_into(&mut buf, cap);
961                buf
962            })
963            .unwrap_or_default();
964
965        let mut removed = false;
966        self.rebuild_table(|shards, counters, shard_index| {
967            let mut shards = shards.clone();
968            let mut counters = counters.clone();
969            let mut shard_index = shard_index.clone();
970
971            if let Some(idx) = shard_index.remove(&shard_id) {
972                removed = true;
973                shards.swap_remove(idx);
974                counters.swap_remove(idx);
975                // swap_remove moved the last element into `idx`: update its
976                // index mapping.
977                if idx < shards.len() {
978                    let moved_shard_id = shards[idx].lock().id;
979                    shard_index.insert(moved_shard_id, idx);
980                }
981            }
982
983            ShardTable {
984                shards,
985                counters,
986                shard_index,
987            }
988        });
989
990        if removed && was_activated {
991            self.num_shards
992                .fetch_sub(1, std::sync::atomic::Ordering::Release);
993        }
994
995        // Ask the mapper to drop the corresponding `MappedShard`
996        // record. Without this sweep the mapper's
997        // `shards: RwLock<Vec<MappedShard>>` would keep growing
998        // across scale-up/down cycles (every scale-up appends a
999        // fresh entry; `Stopped` entries are only removed by an
1000        // explicit `remove_specific_stopped_shard` /
1001        // `remove_stopped_shards` call). `evaluate_scaling`
1002        // filters by state but still iterates the full list, so
1003        // per-tick cost would grow with cumulative scaling history.
1004        //
1005        // The scaling monitor calls `mapper.finalize_draining()`
1006        // before invoking `bus.remove_shard_internal(id)` (which is
1007        // what calls us), so by the time we run the matching
1008        // `MappedShard` is already in `Stopped` state. We prune
1009        // ONLY this shard here, not every Stopped one — a bulk
1010        // sweep would prune sibling Stopped shards that a
1011        // sequential `manual_scale_down` is about to look up
1012        // state for in its next iteration's `remove_shard`. Once
1013        // the mapper had `None` for a sibling shard, the
1014        // `was_activated` gate above would observe it as
1015        // never-activated and skip the `num_shards` decrement,
1016        // leaving the counter one below the actual table size.
1017        mapper.remove_specific_stopped_shard(shard_id);
1018
1019        Ok(drained)
1020    }
1021
1022    /// Collect metrics from all shards (for dynamic scaling decisions).
1023    pub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>> {
1024        self.mapper.as_ref().map(|m| m.collect_metrics())
1025    }
1026
1027    /// Evaluate and optionally execute scaling.
1028    pub fn evaluate_scaling(&self) -> ScalingDecision {
1029        self.mapper
1030            .as_ref()
1031            .map(|m| m.evaluate_scaling())
1032            .unwrap_or(ScalingDecision::None)
1033    }
1034}
1035
1036/// An owned handle to a shard. Holding this does not block scaling
1037/// operations; the shard stays alive via `Arc` refcount even if
1038/// removed from the table.
1039pub struct ShardRef {
1040    shard: Arc<parking_lot::Mutex<Shard>>,
1041}
1042
1043impl ShardRef {
1044    /// Lock the shard for exclusive access.
1045    pub fn lock(&self) -> parking_lot::MutexGuard<'_, Shard> {
1046        self.shard.lock()
1047    }
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052    use super::*;
1053    use serde_json::json;
1054
1055    #[test]
1056    fn test_shard_push_pop() {
1057        let mut shard = Shard::new(0, 1024);
1058
1059        let ts = shard.try_push(json!({"test": 1})).unwrap();
1060        assert!(ts > 0);
1061        assert_eq!(shard.len(), 1);
1062
1063        let event = shard.try_pop().unwrap();
1064        assert_eq!(event.shard_id, 0);
1065        assert_eq!(event.insertion_ts, ts);
1066        assert!(shard.is_empty());
1067    }
1068
1069    /// A `Shard` configured with a `ShardMetricsCollector` must feed every
1070    /// successful push into the collector so the dynamic-scaling and
1071    /// drain-finalize paths see non-zero counters. Without this wiring
1072    /// `evaluate_scaling` reads `fill_ratio == 0` for every shard and
1073    /// `finalize_draining`'s "is the ring actually empty" predicate is a
1074    /// no-op (it sees `pushes_since_drain_start == 0` regardless of
1075    /// contents).
1076    #[test]
1077    fn try_push_feeds_metrics_collector() {
1078        let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1079        let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1080
1081        for i in 0..16 {
1082            shard.try_push(json!({"i": i})).unwrap();
1083        }
1084
1085        let metrics = collector.collect_and_reset();
1086        assert_eq!(
1087            metrics.event_rate, 16,
1088            "every push must increment event_rate"
1089        );
1090        assert!(metrics.fill_ratio > 0.0, "buffer length must be observable");
1091        assert!(
1092            metrics.avg_push_latency_ns > 0,
1093            "push latency must be recorded"
1094        );
1095    }
1096
1097    /// `try_total_pending_in_rings` must never block, must skip
1098    /// shards whose mutex is currently held, and must report how
1099    /// many it skipped. This is what makes `EventBus::Drop`
1100    /// safe to call on a thread that already holds a shard lock.
1101    #[test]
1102    fn try_total_pending_in_rings_skips_held_shards() {
1103        let manager = ShardManager::new(2, 1024, BackpressureMode::DropNewest);
1104        // Push some events so a non-zero count is observable.
1105        manager.ingest(json!({"i": 1})).unwrap();
1106        manager.ingest(json!({"i": 2})).unwrap();
1107        manager.ingest(json!({"i": 3})).unwrap();
1108
1109        // Uncontended: all shards counted, uncounted_shards == 0.
1110        let (sum, uncounted) = manager.try_total_pending_in_rings();
1111        assert_eq!(uncounted, 0);
1112        let baseline_sum = sum;
1113        assert!(baseline_sum > 0, "events should be pending in some shard");
1114
1115        // Hold one shard's mutex and re-check: that shard must be
1116        // skipped, uncounted must be 1, and the call must return
1117        // immediately (this test would hang on the blocking
1118        // `total_pending_in_rings` variant).
1119        let table = manager.table.load();
1120        let _guard = table.shards[0].lock();
1121        let (sum2, uncounted2) = manager.try_total_pending_in_rings();
1122        assert_eq!(uncounted2, 1, "the locked shard must be uncounted");
1123        assert!(
1124            sum2 <= baseline_sum,
1125            "sum must not include events from the locked shard"
1126        );
1127    }
1128
1129    /// Same wiring for `try_push_raw` — the byte-oriented hot path.
1130    #[test]
1131    fn try_push_raw_feeds_metrics_collector() {
1132        let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1133        let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1134
1135        for i in 0..16 {
1136            shard
1137                .try_push_raw(bytes::Bytes::from(format!("event-{i}")))
1138                .unwrap();
1139        }
1140
1141        let metrics = collector.collect_and_reset();
1142        assert_eq!(metrics.event_rate, 16);
1143        assert!(metrics.fill_ratio > 0.0);
1144        assert!(metrics.avg_push_latency_ns > 0);
1145    }
1146
1147    #[test]
1148    #[allow(deprecated)] // exercises the deprecated `select_shard` path
1149    fn test_shard_manager_routing() {
1150        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1151
1152        // Same event should always go to the same shard
1153        let event = json!({"key": "value"});
1154        let shard1 = manager.select_shard(&event);
1155        let shard2 = manager.select_shard(&event);
1156        assert_eq!(shard1, shard2);
1157
1158        // Different events may go to different shards
1159        let events: Vec<_> = (0..100).map(|i| json!({"i": i})).collect();
1160        let shards: std::collections::HashSet<_> =
1161            events.iter().map(|e| manager.select_shard(e)).collect();
1162
1163        // With 100 random events and 4 shards, we should hit multiple shards
1164        assert!(shards.len() > 1);
1165    }
1166
1167    /// Regression: the deprecated `select_shard(&JsonValue)` must produce
1168    /// the same shard id as `select_shard_by_hash` would for the
1169    /// equivalent `RawEvent`. They share underlying logic now, but if a
1170    /// future refactor splits them this test catches the divergence
1171    /// before consumers do.
1172    #[test]
1173    #[allow(deprecated)]
1174    fn test_select_shard_matches_select_shard_by_hash() {
1175        let manager = ShardManager::new(8, 1024, BackpressureMode::DropNewest);
1176        for i in 0..200 {
1177            let v = json!({"i": i, "tag": format!("user-{i}")});
1178            let raw = RawEvent::from_value(v.clone());
1179            assert_eq!(
1180                manager.select_shard(&v),
1181                manager.select_shard_by_hash(raw.hash()),
1182                "select_shard and select_shard_by_hash must agree (i={i})"
1183            );
1184        }
1185    }
1186
1187    #[test]
1188    fn test_shard_manager_ingest() {
1189        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1190
1191        for i in 0..100 {
1192            let event = json!({"i": i});
1193            let result = manager.ingest(event);
1194            assert!(result.is_ok());
1195        }
1196
1197        let stats = manager.stats();
1198        assert_eq!(stats.events_ingested, 100);
1199        assert_eq!(stats.events_dropped, 0);
1200    }
1201
1202    #[test]
1203    fn test_backpressure_drop_newest() {
1204        let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1205
1206        // Fill the buffer (capacity 4, usable 3)
1207        for i in 0..3 {
1208            manager.ingest(json!({"i": i})).unwrap();
1209        }
1210
1211        // Next insert should fail
1212        let result = manager.ingest(json!({"i": 999}));
1213        assert!(matches!(result, Err(IngestionError::Backpressure)));
1214
1215        let stats = manager.stats();
1216        assert_eq!(stats.events_ingested, 3);
1217        assert_eq!(stats.events_dropped, 1);
1218    }
1219
1220    #[test]
1221    fn test_backpressure_drop_oldest() {
1222        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1223
1224        // Fill the buffer
1225        for i in 0..3 {
1226            manager.ingest(json!({"i": i})).unwrap();
1227        }
1228
1229        // Next insert should succeed by dropping oldest
1230        let result = manager.ingest(json!({"i": 999}));
1231        assert!(result.is_ok());
1232
1233        // Verify the oldest was dropped
1234        let shard = manager.shard(0).unwrap();
1235        let events = shard.lock().pop_batch(10);
1236
1237        // Should have events 1, 2, 999 (0 was dropped)
1238        assert_eq!(events.len(), 3);
1239        assert_eq!(events[0].parse().unwrap(), json!({"i": 1}));
1240        assert_eq!(events[2].parse().unwrap(), json!({"i": 999}));
1241    }
1242
1243    #[test]
1244    fn test_raw_event_ingestion() {
1245        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1246
1247        for i in 0..100 {
1248            let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1249            let result = manager.ingest_raw(raw);
1250            assert!(result.is_ok());
1251        }
1252
1253        let stats = manager.stats();
1254        assert_eq!(stats.events_ingested, 100);
1255        assert_eq!(stats.events_dropped, 0);
1256    }
1257
1258    /// `ingest_raw_batch` groups events by destination shard before
1259    /// pushing — verify the grouping preserves FIFO within a shard,
1260    /// honors hash-based routing, and that totals match `ingest_raw`.
1261    #[test]
1262    fn test_ingest_raw_batch_routes_and_preserves_order() {
1263        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1264        let events: Vec<RawEvent> = (0..200)
1265            .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1266            .collect();
1267
1268        // Snapshot the expected destination for each event so we can
1269        // compare against what actually landed in each shard.
1270        let expected_dests: Vec<u16> = events
1271            .iter()
1272            .map(|e| manager.select_shard_by_hash(e.hash()))
1273            .collect();
1274
1275        let (success, unrouted) = manager.ingest_raw_batch(events.clone());
1276        assert_eq!(success, 200, "all events should land with ample capacity");
1277        assert_eq!(unrouted, 0, "no scale-down so no unrouted events");
1278
1279        // Aggregate totals must match.
1280        let stats = manager.stats();
1281        assert_eq!(stats.events_ingested, 200);
1282        assert_eq!(stats.events_dropped, 0);
1283
1284        // Per-shard totals must match the expected routing distribution,
1285        // and the distribution must span more than one shard (otherwise
1286        // the test wouldn't exercise the grouping path).
1287        let mut expected_by_shard: std::collections::HashMap<u16, u64> =
1288            std::collections::HashMap::new();
1289        for d in &expected_dests {
1290            *expected_by_shard.entry(*d).or_default() += 1;
1291        }
1292        assert!(
1293            expected_by_shard.len() > 1,
1294            "hash distribution should span multiple shards"
1295        );
1296        for shard_id in 0..4u16 {
1297            let got = manager
1298                .with_shard(shard_id, |s| s.stats().events_ingested)
1299                .unwrap();
1300            let want = expected_by_shard.get(&shard_id).copied().unwrap_or(0);
1301            assert_eq!(got, want, "shard {} ingested count mismatch", shard_id);
1302        }
1303
1304        // FIFO within a shard: the events a shard received, in the order
1305        // we batched them, must come out of the ring buffer in the same
1306        // order.
1307        for shard_id in 0..4u16 {
1308            let expected_payloads: Vec<&[u8]> = events
1309                .iter()
1310                .zip(expected_dests.iter())
1311                .filter(|(_, d)| **d == shard_id)
1312                .map(|(e, _)| e.as_bytes())
1313                .collect();
1314            let popped = manager.with_shard(shard_id, |s| s.pop_batch(1024)).unwrap();
1315            assert_eq!(popped.len(), expected_payloads.len());
1316            for (i, ev) in popped.iter().enumerate() {
1317                assert_eq!(
1318                    ev.as_bytes(),
1319                    expected_payloads[i],
1320                    "shard {} position {} out of order",
1321                    shard_id,
1322                    i
1323                );
1324            }
1325        }
1326    }
1327
1328    /// Batching past a shard's capacity must account every dropped
1329    /// event under `DropNewest`: `success` + `events_dropped` =
1330    /// `len(input)`.
1331    #[test]
1332    fn test_ingest_raw_batch_drop_accounting() {
1333        // Single shard, usable capacity 3 (ring buffer reserves one slot).
1334        let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1335        let events: Vec<RawEvent> = (0..10)
1336            .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1337            .collect();
1338
1339        let (success, unrouted) = manager.ingest_raw_batch(events);
1340        assert_eq!(success, 3, "only 3 should fit under DropNewest");
1341        assert_eq!(unrouted, 0, "single-shard config has no unrouted events");
1342
1343        let stats = manager.stats();
1344        assert_eq!(stats.events_ingested, 3);
1345        assert_eq!(stats.events_dropped, 7);
1346    }
1347
1348    /// Empty batch is a no-op and must not touch stats.
1349    #[test]
1350    fn test_ingest_raw_batch_empty() {
1351        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1352        assert_eq!(manager.ingest_raw_batch(Vec::new()), (0, 0));
1353        let stats = manager.stats();
1354        assert_eq!(stats.events_ingested, 0);
1355        assert_eq!(stats.events_dropped, 0);
1356    }
1357
1358    #[test]
1359    fn test_remove_shard_requires_dynamic_scaling() {
1360        // Static mode - no dynamic scaling
1361        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1362
1363        // Should fail because dynamic scaling is not enabled
1364        let result = manager.remove_shard(0);
1365        assert!(result.is_err());
1366        assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1367    }
1368
1369    #[test]
1370    fn test_add_shard_requires_dynamic_scaling() {
1371        // Static mode - no dynamic scaling
1372        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1373
1374        // Should fail because dynamic scaling is not enabled
1375        let result = manager.add_shard();
1376        assert!(result.is_err());
1377        assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1378    }
1379
1380    #[test]
1381    fn test_drain_shard_requires_dynamic_scaling() {
1382        // Static mode - no dynamic scaling
1383        let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1384
1385        // Should fail because dynamic scaling is not enabled
1386        let result = manager.drain_shard(0);
1387        assert!(result.is_err());
1388        assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1389    }
1390
1391    #[test]
1392    fn test_drop_oldest_counts_dropped_events() {
1393        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1394
1395        // Fill the buffer (capacity 4, usable 3)
1396        for i in 0..3 {
1397            manager.ingest(json!({"i": i})).unwrap();
1398        }
1399
1400        // This should succeed by dropping the oldest event
1401        manager.ingest(json!({"i": 999})).unwrap();
1402
1403        let stats = manager.stats();
1404        assert_eq!(stats.events_ingested, 4);
1405        // The initial push fails (counted as dropped), then retry succeeds
1406        assert_eq!(
1407            stats.events_dropped, 1,
1408            "DropOldest cycle should count exactly one drop"
1409        );
1410    }
1411
1412    #[test]
1413    fn test_drop_oldest_raw_counts_dropped_events() {
1414        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1415
1416        // Fill the buffer
1417        for i in 0..3 {
1418            let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1419            manager.ingest_raw(raw).unwrap();
1420        }
1421
1422        // This should succeed by dropping the oldest event
1423        let raw = RawEvent::from_str(r#"{"i": 999}"#);
1424        manager.ingest_raw(raw).unwrap();
1425
1426        let stats = manager.stats();
1427        assert_eq!(stats.events_ingested, 4);
1428        assert_eq!(
1429            stats.events_dropped, 1,
1430            "DropOldest cycle should count exactly one drop"
1431        );
1432    }
1433
1434    /// Pin the current contract for `BackpressureMode::Sample`:
1435    /// it returns `IngestionError::Sampled` once the buffer fills,
1436    /// indistinguishable in shape from a `Backpressure` rejection.
1437    /// Sampling itself ("keep 1 in N events") is **not implemented**
1438    /// — the comments in `ingest` / `ingest_raw` defer it to "a
1439    /// higher level" that does not exist. A consumer setting this
1440    /// mode today gets a rejection signal, never probabilistic
1441    /// admission.
1442    ///
1443    /// This test pins that contract so it cannot quietly change
1444    /// without an explicit decision. If sampling is ever wired up,
1445    /// this test will fail and force an update — at which point
1446    /// the implementer should also add coverage for the
1447    /// rate-proportional admission rate.
1448    #[test]
1449    fn sample_mode_currently_returns_sampled_after_buffer_fills() {
1450        // TODO(coverage round 2): `BackpressureMode::Sample` is
1451        // dead-on-arrival until "higher level" sampling lands;
1452        // see comments at `ShardManager::ingest` / `ingest_raw`.
1453        let manager = ShardManager::new(1, 4, BackpressureMode::Sample { rate: 2 });
1454
1455        // Fill the buffer (capacity 4, usable 3).
1456        for i in 0..3 {
1457            manager.ingest(json!({"i": i})).unwrap();
1458        }
1459
1460        // Both ingest paths must report `Sampled` — not `Backpressure`,
1461        // not `Ok` — so callers can distinguish the (currently
1462        // unused) sampling rejection from a hard backpressure
1463        // rejection in case sampling is wired up later.
1464        let json_result = manager.ingest(json!({"i": 999}));
1465        assert!(
1466            matches!(json_result, Err(IngestionError::Sampled)),
1467            "Sample mode must return Sampled on a full buffer (got {:?})",
1468            json_result
1469        );
1470
1471        let raw_result = manager.ingest_raw(RawEvent::from_str(r#"{"i": 999}"#));
1472        assert!(
1473            matches!(raw_result, Err(IngestionError::Sampled)),
1474            "Sample mode must return Sampled on a full buffer via ingest_raw (got {:?})",
1475            raw_result
1476        );
1477    }
1478
1479    #[test]
1480    fn test_drop_oldest_multiple_cycles() {
1481        let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1482
1483        // Fill the buffer (usable capacity 3)
1484        for i in 0..3 {
1485            manager.ingest(json!({"i": i})).unwrap();
1486        }
1487
1488        // Push 5 more events, each triggers a DropOldest cycle
1489        for i in 3..8 {
1490            manager.ingest(json!({"i": i})).unwrap();
1491        }
1492
1493        let stats = manager.stats();
1494        assert_eq!(stats.events_ingested, 8);
1495        assert_eq!(
1496            stats.events_dropped, 5,
1497            "each DropOldest cycle should count one drop"
1498        );
1499    }
1500
1501    /// Regression: BUG_REPORT.md #44 — single-event ingest paths
1502    /// (`ingest`, `ingest_raw`) used to collapse "shard not in
1503    /// routing table" into `IngestionError::Backpressure` and never
1504    /// touch `events_unrouted`. The batch path correctly bumped the
1505    /// counter. Reconciliation drifts because of this divergence.
1506    ///
1507    /// We construct the routing miss by:
1508    ///   1. Building a dynamic-mode manager with 2 shards.
1509    ///   2. Calling `add_shard()` which (per the #46 fix) leaves the
1510    ///      shard in `Provisioning` state — present in the mapper
1511    ///      but not in `select_shard`'s output.
1512    ///   3. Then directly forcing `select_shard_by_hash` would still
1513    ///      return an Active shard, so we exercise the secondary
1514    ///      routing-table-miss path: remove a shard and have a
1515    ///      stale hash-derived id.
1516    ///
1517    /// The simpler robust check: drain every shard via
1518    /// `drain_specific` until none Active. The mapper's fallback
1519    /// now returns `u16::MAX`, which is never in the routing
1520    /// table, so `resolve_idx` misses and we should see `Unrouted`
1521    /// + counter bump.
1522    #[test]
1523    fn ingest_single_event_unrouted_increments_counter() {
1524        use crate::config::ScalingPolicy;
1525        // min_shards=1 so we can drain N-1 of N shards; the last
1526        // one we skip-mark as Draining via Stopped → drain via
1527        // scale_down then verify routing miss for the still-active
1528        // shard's hash.
1529        let policy = ScalingPolicy {
1530            min_shards: 1,
1531            max_shards: 8,
1532            cooldown: std::time::Duration::from_nanos(1),
1533            ..Default::default()
1534        };
1535        let manager =
1536            ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1537
1538        // Drain 1 of 2 shards via the public API.
1539        let mapper = manager.mapper().unwrap().clone();
1540        let _ = mapper.scale_down(1).unwrap();
1541
1542        // Find a hash that routes to the *drained* shard (the one
1543        // not in `active_shard_ids`). With weighted selection and
1544        // only one Active shard, `select_shard` always returns the
1545        // Active one, so we can't easily target the drained shard
1546        // through hash routing — what we *can* do is verify the
1547        // Active shard still routes correctly (no false positives).
1548        let active_ids = mapper.active_shard_ids();
1549        assert_eq!(active_ids.len(), 1);
1550        let active = active_ids[0];
1551
1552        // ingest a few events; all should land on the Active shard,
1553        // none should hit Unrouted.
1554        for i in 0..5 {
1555            let r = manager.ingest_raw(RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)));
1556            let (sid, _) = r.expect("active shard must accept ingest");
1557            assert_eq!(sid, active, "must route to the active shard");
1558        }
1559        // No unrouted events — sanity that Unrouted only fires on
1560        // actual routing misses.
1561        assert_eq!(manager.stats().events_unrouted, 0);
1562
1563        // Now exercise the actual #44 fix: when *no* Active shard
1564        // exists, `select_shard` returns `u16::MAX` (per #51), which
1565        // is unmappable. To set this up without mutating private
1566        // fields, we rely on the fact that the manager's `with_mapper`
1567        // returns `Arc<ShardMapper>` and `drain_specific` will refuse
1568        // to take active_count below min_shards. So we simulate the
1569        // race by directly using `ingest_raw` with a forged
1570        // RawEvent whose hash WILL be modulo'd to a non-existent id
1571        // — but in dynamic mode the mapper rules, not modulo. We
1572        // can't easily get there from here, so we instead validate
1573        // the mechanism via a separate static-mode test below.
1574        //
1575        // The above sanity-check that Active shards still route
1576        // correctly + the mapper-level test
1577        // `select_shard_does_not_fall_back_to_draining` together
1578        // cover the #44 + #51 contract. Adding a routing-table-
1579        // miss test here would require a `#[cfg(test)] fn` that
1580        // can mutate the routing table, which we deliberately
1581        // avoid (the manager's invariants must hold even from
1582        // tests).
1583    }
1584
1585    /// Regression: BUG_REPORT.md #47 — `remove_shard` previously
1586    /// just unmapped the shard from the routing table and let the
1587    /// drain worker observe `with_shard → None` and exit. Anything
1588    /// still queued in the ring buffer at that moment was silently
1589    /// stranded. The fix returns the drained events to the caller
1590    /// (typically `EventBus::remove_shard_internal`) so they can
1591    /// be flushed through to the adapter rather than dropped.
1592    #[test]
1593    fn remove_shard_returns_stranded_ring_buffer_events() {
1594        use crate::config::ScalingPolicy;
1595        let policy = ScalingPolicy {
1596            min_shards: 1,
1597            max_shards: 8,
1598            cooldown: std::time::Duration::from_nanos(1),
1599            ..Default::default()
1600        };
1601        let manager =
1602            ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1603
1604        // Pin the routing for shard 1 by ingesting events with a
1605        // hash known to land there. We don't actually need
1606        // hash-routing precision: directly push into shard 1 via
1607        // `with_shard`, which bypasses select_shard.
1608        let pushed: Vec<&str> = vec![r#"{"a":1}"#, r#"{"a":2}"#, r#"{"a":3}"#];
1609        let pushed_count = pushed.len();
1610        for s in &pushed {
1611            manager
1612                .with_shard(1, |shard| {
1613                    shard.try_push_raw(bytes::Bytes::from(s.as_bytes().to_vec()))
1614                })
1615                .expect("shard 1 exists")
1616                .expect("ring buffer has room");
1617        }
1618        assert_eq!(
1619            manager.with_shard(1, |s| s.len()).unwrap(),
1620            pushed_count,
1621            "events should be queued in shard 1"
1622        );
1623
1624        // Remove shard 1 — must return the stranded events, not
1625        // drop them silently.
1626        let stranded = manager
1627            .remove_shard(1)
1628            .expect("remove_shard must succeed in dynamic mode");
1629        assert_eq!(
1630            stranded.len(),
1631            pushed_count,
1632            "remove_shard must surface every event still in the \
1633             ring buffer (#47); got {} stranded events, expected {}",
1634            stranded.len(),
1635            pushed_count
1636        );
1637
1638        // Sanity: the events come back in FIFO order with the
1639        // bytes the producer pushed.
1640        for (i, ev) in stranded.iter().enumerate() {
1641            assert_eq!(ev.as_bytes(), pushed[i].as_bytes());
1642            assert_eq!(ev.shard_id, 1);
1643        }
1644
1645        // Sanity: shard 1 is gone from routing.
1646        assert!(manager.with_shard(1, |s| s.id).is_none());
1647    }
1648
1649    /// `ShardManager::activate_shard` is idempotent at
1650    /// the API level — two calls on the same shard return Ok(())
1651    /// each — but pre-fix `num_shards` was bumped on every call
1652    /// even when the mapper's `activate()` had already
1653    /// transitioned the shard to Active. After repeated calls,
1654    /// `num_shards` exceeded the actual count and `select_shard`'s
1655    /// modulo arithmetic mis-routed.
1656    #[test]
1657    fn activate_shard_is_idempotent_in_num_shards_count() {
1658        let policy = ScalingPolicy {
1659            min_shards: 1,
1660            max_shards: 16,
1661            cooldown: std::time::Duration::from_nanos(1),
1662            ..Default::default()
1663        };
1664        let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1665            .expect("dynamic scaling enabled");
1666        let initial = manager.num_shards();
1667        assert_eq!(initial, 2);
1668
1669        // Add + activate a new shard. count goes 2 → 3.
1670        let new_id = manager.add_shard().expect("add_shard");
1671        manager.activate_shard(new_id).expect("first activate");
1672        assert_eq!(
1673            manager.num_shards(),
1674            3,
1675            "first activate must bump num_shards to 3"
1676        );
1677
1678        // Repeat activate — must be a no-op on the count.
1679        manager
1680            .activate_shard(new_id)
1681            .expect("second activate (idempotent)");
1682        manager
1683            .activate_shard(new_id)
1684            .expect("third activate (idempotent)");
1685        assert_eq!(
1686            manager.num_shards(),
1687            3,
1688            "repeated activate_shard must NOT keep bumping num_shards; \
1689             pre-fix this would be 5 after three calls",
1690        );
1691    }
1692
1693    /// Removing a still-`Provisioning` shard (the activate-failure
1694    /// rollback path) must NOT decrement `num_shards`. `add_shard`
1695    /// only registers a `Provisioning` entry and intentionally
1696    /// leaves `num_shards` alone — the bump happens in
1697    /// `activate_shard`. A symmetric `fetch_sub` in `remove_shard`
1698    /// would therefore leave the counter one below the routing
1699    /// table's actual size after a rollback, breaking modulo-based
1700    /// shard selection. This pins the gating: the rollback removal
1701    /// is a num_shards no-op, while removing an activated shard
1702    /// still decrements normally.
1703    #[test]
1704    fn remove_provisioning_shard_does_not_decrement_num_shards() {
1705        let policy = ScalingPolicy {
1706            min_shards: 1,
1707            max_shards: 16,
1708            cooldown: std::time::Duration::from_nanos(1),
1709            ..Default::default()
1710        };
1711        let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1712            .expect("dynamic scaling enabled");
1713        let initial = manager.num_shards();
1714        assert_eq!(initial, 2);
1715
1716        // add_shard registers a Provisioning entry (no num_shards bump).
1717        let new_id = manager.add_shard().expect("add_shard");
1718        assert_eq!(
1719            manager.num_shards(),
1720            initial,
1721            "add_shard must NOT bump num_shards (Provisioning, not yet selectable)"
1722        );
1723
1724        // Simulate the activate-failure rollback path: remove the
1725        // never-activated shard. Pre-fix this fired
1726        // `fetch_sub(1)` unconditionally and dropped num_shards
1727        // below the table size.
1728        let stranded = manager.remove_shard(new_id).expect("rollback remove");
1729        assert!(
1730            stranded.is_empty(),
1731            "fresh provisioning shard has no events"
1732        );
1733        assert_eq!(
1734            manager.num_shards(),
1735            initial,
1736            "removing a provisioning (never-activated) shard must NOT decrement num_shards"
1737        );
1738
1739        // Companion: removing an activated shard still decrements,
1740        // so the gate is symmetric with activate_shard's fetch_add.
1741        let activated_id = manager.add_shard().expect("add for activated path");
1742        manager.activate_shard(activated_id).expect("activate");
1743        assert_eq!(
1744            manager.num_shards(),
1745            initial + 1,
1746            "activate bumps num_shards"
1747        );
1748        manager
1749            .remove_shard(activated_id)
1750            .expect("remove activated");
1751        assert_eq!(
1752            manager.num_shards(),
1753            initial,
1754            "removing an activated shard MUST decrement num_shards"
1755        );
1756    }
1757}