net/shard/mod.rs
1//! Shard management for parallel event ingestion.
2//!
3//! The shard module provides:
4//! - Lock-free ring buffers for high-throughput event queuing
5//! - Per-shard timestamp generation (no cross-shard contention)
6//! - Batch assembly with adaptive sizing
7//! - Shard manager for coordinating multiple shards
8//! - Dynamic shard scaling with weighted producer routing
9
10mod batch;
11mod mapper;
12mod ring_buffer;
13
14pub use batch::{AdaptiveBatcher, BatchWorker};
15pub use mapper::{
16 ScalingDecision, ScalingError, ShardMapper, ShardMetrics, ShardMetricsCollector, ShardState,
17};
18// `RingBuffer` and `BufferFullError` are intentionally NOT re-exported.
19// External callers go through `EventBus` / `ShardManager`, which
20// uphold the SPSC contract via `Mutex<Shard>`. Exposing the raw ring
21// buffer publicly was a silent-UB footgun — anyone wrapping it in an
22// `Arc` and pushing from two threads got data corruption with no
23// compile-time signal. `BufferFullError` is not
24// re-exported here either: callers see it as `IngestionError::Backpressure`.
25pub(crate) use ring_buffer::RingBuffer;
26
27// Re-export ScalingPolicy from config for convenience
28pub use crate::config::ScalingPolicy;
29
30use bytes::Bytes;
31
32use crate::config::BackpressureMode;
33use crate::error::IngestionError;
34use crate::event::{InternalEvent, RawEvent};
35use crate::timestamp::TimestampGenerator;
36
37use serde_json::Value as JsonValue;
38use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
39use std::sync::Arc;
40use std::time::Instant;
41
42/// Atomic counters for a single shard. Kept outside `Shard` as `Arc`s
43/// so `ShardManager::stats()` can aggregate them without locking each
44/// shard's mutex.
45#[derive(Debug, Default)]
46pub struct ShardCounters {
47 /// Total events ingested into this shard.
48 pub events_ingested: AtomicU64,
49 /// Events dropped due to backpressure.
50 pub events_dropped: AtomicU64,
51 /// Batches successfully dispatched to the adapter.
52 pub batches_dispatched: AtomicU64,
53}
54
55/// Statistics for a single shard (snapshot).
56#[derive(Debug, Default, Clone, Copy)]
57pub struct ShardStats {
58 /// Total events ingested.
59 pub events_ingested: u64,
60 /// Events dropped due to backpressure.
61 pub events_dropped: u64,
62 /// Batches dispatched to adapter.
63 pub batches_dispatched: u64,
64 /// Events that arrived at `ingest_raw_batch` but had no resolvable
65 /// shard (e.g. the routing table was rebuilt mid-dispatch and the
66 /// hashed shard id is no longer present). These cannot be
67 /// attributed to a per-shard counter, so they are tracked at the
68 /// `ShardManager` level and surfaced through aggregated `stats()`.
69 pub events_unrouted: u64,
70}
71
72impl ShardCounters {
73 /// Load a consistent snapshot of the counters.
74 ///
75 /// `events_unrouted` is left at zero here — it is a manager-level
76 /// counter, not a per-shard one. `ShardManager::stats()` fills it
77 /// in after summing per-shard fields.
78 #[inline]
79 pub fn snapshot(&self) -> ShardStats {
80 ShardStats {
81 events_ingested: self.events_ingested.load(AtomicOrdering::Relaxed),
82 events_dropped: self.events_dropped.load(AtomicOrdering::Relaxed),
83 batches_dispatched: self.batches_dispatched.load(AtomicOrdering::Relaxed),
84 events_unrouted: 0,
85 }
86 }
87}
88
89/// A single shard with its own ring buffer and timestamp generator.
90pub struct Shard {
91 /// Shard identifier.
92 pub id: u16,
93 /// Ring buffer for event queuing.
94 ring_buffer: RingBuffer<InternalEvent>,
95 /// Shard-local timestamp generator (no contention).
96 timestamp_gen: TimestampGenerator,
97 /// Shared atomic counters (also referenced from `ShardTable` for
98 /// lock-free aggregation).
99 counters: Arc<ShardCounters>,
100 /// Optional metrics collector for dynamic scaling.
101 metrics_collector: Option<Arc<ShardMetricsCollector>>,
102 /// Ring buffer capacity (for metrics).
103 capacity: usize,
104}
105
106impl Shard {
107 /// Create a new shard.
108 pub fn new(id: u16, capacity: usize) -> Self {
109 Self {
110 id,
111 ring_buffer: RingBuffer::new(capacity),
112 timestamp_gen: TimestampGenerator::new(),
113 counters: Arc::new(ShardCounters::default()),
114 metrics_collector: None,
115 capacity,
116 }
117 }
118
119 /// Create a new shard with a metrics collector for dynamic scaling.
120 pub fn with_metrics(id: u16, capacity: usize, metrics: Arc<ShardMetricsCollector>) -> Self {
121 Self {
122 id,
123 ring_buffer: RingBuffer::new(capacity),
124 timestamp_gen: TimestampGenerator::new(),
125 counters: Arc::new(ShardCounters::default()),
126 metrics_collector: Some(metrics),
127 capacity,
128 }
129 }
130
131 /// Clone the atomic counter handle (for lock-free aggregation).
132 #[inline]
133 pub fn counters(&self) -> Arc<ShardCounters> {
134 self.counters.clone()
135 }
136
137 /// Set the metrics collector.
138 pub fn set_metrics_collector(&mut self, metrics: Arc<ShardMetricsCollector>) {
139 self.metrics_collector = Some(metrics);
140 }
141
142 /// Try to push a raw event (pre-serialized bytes) into the shard's ring buffer.
143 /// Returns the assigned insertion timestamp on success.
144 ///
145 /// This is the fastest ingestion path - no serialization or hashing needed.
146 #[inline]
147 pub fn try_push_raw(&mut self, raw: Bytes) -> Result<u64, IngestionError> {
148 let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
149 let ts = self.timestamp_gen.next();
150 let event = InternalEvent::new(raw, ts, self.id);
151
152 match self.ring_buffer.try_push(event) {
153 Ok(()) => {
154 self.counters
155 .events_ingested
156 .fetch_add(1, AtomicOrdering::Relaxed);
157 if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
158 collector.record_push(start.elapsed().as_nanos() as u64);
159 collector.record_buffer_len(self.ring_buffer.len());
160 }
161 Ok(ts)
162 }
163 Err(_) => {
164 self.counters
165 .events_dropped
166 .fetch_add(1, AtomicOrdering::Relaxed);
167 Err(IngestionError::Backpressure)
168 }
169 }
170 }
171
172 /// Try to push a JSON value into the shard's ring buffer.
173 /// Returns the assigned insertion timestamp on success.
174 ///
175 /// This serializes the value once before storing.
176 #[inline]
177 pub fn try_push(&mut self, raw: JsonValue) -> Result<u64, IngestionError> {
178 let push_start = self.metrics_collector.as_ref().map(|_| Instant::now());
179 let ts = self.timestamp_gen.next();
180 let event = InternalEvent::from_value(raw, ts, self.id);
181
182 match self.ring_buffer.try_push(event) {
183 Ok(()) => {
184 self.counters
185 .events_ingested
186 .fetch_add(1, AtomicOrdering::Relaxed);
187 if let (Some(collector), Some(start)) = (&self.metrics_collector, push_start) {
188 collector.record_push(start.elapsed().as_nanos() as u64);
189 collector.record_buffer_len(self.ring_buffer.len());
190 }
191 Ok(ts)
192 }
193 Err(_) => {
194 self.counters
195 .events_dropped
196 .fetch_add(1, AtomicOrdering::Relaxed);
197 Err(IngestionError::Backpressure)
198 }
199 }
200 }
201
202 /// Pop a batch of events from the ring buffer.
203 ///
204 /// Allocates a fresh `Vec`. Prefer [`pop_batch_into`] in drain
205 /// loops where the per-cycle `Vec` allocation should happen
206 /// outside the shard mutex.
207 ///
208 /// [`pop_batch_into`]: Self::pop_batch_into
209 #[inline]
210 pub fn pop_batch(&mut self, max: usize) -> Vec<InternalEvent> {
211 let out = self.ring_buffer.pop_batch(max);
212 if let Some(collector) = &self.metrics_collector {
213 collector.record_buffer_len(self.ring_buffer.len());
214 }
215 out
216 }
217
218 /// Pop a batch of events into a caller-owned buffer.
219 ///
220 /// Append semantics: does **not** clear `dst`; reserves
221 /// `count` slots and pushes drained elements onto the end.
222 /// Returns the number drained this call. Use this in
223 /// steady-state drain loops where the caller keeps a scratch
224 /// `Vec` across cycles, so the per-cycle allocation moves out
225 /// of the consumer's critical section.
226 #[inline]
227 pub fn pop_batch_into(&mut self, dst: &mut Vec<InternalEvent>, max: usize) -> usize {
228 let n = self.ring_buffer.pop_batch_into(dst, max);
229 if let Some(collector) = &self.metrics_collector {
230 collector.record_buffer_len(self.ring_buffer.len());
231 }
232 n
233 }
234
235 /// Try to pop a single event from the ring buffer.
236 #[inline]
237 pub fn try_pop(&mut self) -> Option<InternalEvent> {
238 let out = self.ring_buffer.try_pop();
239 if let Some(collector) = &self.metrics_collector {
240 collector.record_buffer_len(self.ring_buffer.len());
241 }
242 out
243 }
244
245 /// Producer-side eviction of the oldest event.
246 ///
247 /// Used by `BackpressureMode::DropOldest` to make room for a
248 /// new push when the buffer is full. Bypasses the ring buffer's
249 /// consumer-thread tracking (the producer thread is calling
250 /// what is normally a consumer-side operation). Safe because
251 /// the outer shard mutex serializes this against any concurrent
252 /// `try_pop` from the legitimate consumer (the batch worker).
253 #[inline]
254 pub(crate) fn evict_oldest(&mut self) -> Option<InternalEvent> {
255 self.ring_buffer.evict_oldest()
256 }
257
258 /// Get the current buffer length.
259 #[inline]
260 pub fn len(&self) -> usize {
261 self.ring_buffer.len()
262 }
263
264 /// Check if the buffer is empty.
265 #[inline]
266 pub fn is_empty(&self) -> bool {
267 self.ring_buffer.is_empty()
268 }
269
270 /// Check if the buffer is full.
271 #[inline]
272 pub fn is_full(&self) -> bool {
273 self.ring_buffer.is_full()
274 }
275
276 /// Get the fill ratio (0.0 - 1.0).
277 #[inline]
278 pub fn fill_ratio(&self) -> f64 {
279 if self.capacity == 0 {
280 0.0
281 } else {
282 self.ring_buffer.len() as f64 / self.capacity as f64
283 }
284 }
285
286 /// Get the ring buffer capacity.
287 #[inline]
288 pub fn capacity(&self) -> usize {
289 self.capacity
290 }
291
292 /// Get a snapshot of shard statistics.
293 pub fn stats(&self) -> ShardStats {
294 self.counters.snapshot()
295 }
296
297 /// Record a batch dispatch.
298 pub fn record_batch_dispatch(&self) {
299 self.counters
300 .batches_dispatched
301 .fetch_add(1, AtomicOrdering::Relaxed);
302 }
303}
304
305/// Immutable routing table: shards + index + counter handles.
306///
307/// Placed behind an `ArcSwap` on `ShardManager` so the common read
308/// path (`ingest`, `ingest_raw`, `with_shard`, `stats`) is
309/// lock-free. Rebuilt on scale up/down via RCU-style swap.
310pub struct ShardTable {
311 /// All shards, indexed by position. `Arc<Mutex<Shard>>` lets a new
312 /// table share shard handles with the previous table (cheap Arc
313 /// clones during rebuild).
314 shards: Vec<Arc<parking_lot::Mutex<Shard>>>,
315 /// Parallel vector of counter handles. Exposes stats without
316 /// locking the shard mutex.
317 counters: Vec<Arc<ShardCounters>>,
318 /// Map from shard ID to index in `shards`/`counters`.
319 shard_index: std::collections::HashMap<u16, usize>,
320}
321
322impl ShardTable {
323 fn new(shards: Vec<Shard>) -> Self {
324 let mut shard_index = std::collections::HashMap::with_capacity(shards.len());
325 let mut counters = Vec::with_capacity(shards.len());
326 let shards: Vec<_> = shards
327 .into_iter()
328 .enumerate()
329 .map(|(idx, s)| {
330 shard_index.insert(s.id, idx);
331 counters.push(s.counters());
332 Arc::new(parking_lot::Mutex::new(s))
333 })
334 .collect();
335 Self {
336 shards,
337 counters,
338 shard_index,
339 }
340 }
341}
342
343/// Manager for multiple shards.
344///
345/// The ShardManager can operate in two modes:
346/// 1. Static mode (default): Fixed number of shards, simple hash-based routing
347/// 2. Dynamic mode: Shards can be added/removed based on load, weighted routing
348pub struct ShardManager {
349 /// Routing table. Swapped atomically on scale up/down so readers
350 /// never see a partially-updated `(shards, shard_index)` pair.
351 table: arc_swap::ArcSwap<ShardTable>,
352 /// Current number of active shards.
353 num_shards: std::sync::atomic::AtomicU16,
354 /// Backpressure mode.
355 backpressure_mode: BackpressureMode,
356 /// Ring buffer capacity for new shards.
357 ring_buffer_capacity: usize,
358 /// Optional shard mapper for dynamic scaling.
359 mapper: Option<Arc<ShardMapper>>,
360 /// Serializes concurrent `add_shard` / `remove_shard` rebuilds.
361 /// Not on the ingest path.
362 rebuild_lock: parking_lot::Mutex<()>,
363 /// Events dropped because no destination shard was resolvable.
364 /// Distinct from per-shard `events_dropped` (which tracks
365 /// backpressure on a known shard) — this counts events whose
366 /// hashed shard id was missing from the routing table at lookup
367 /// time, e.g. due to a concurrent scale-down. Surfaced via
368 /// `stats().events_unrouted`.
369 events_unrouted: AtomicU64,
370}
371
372impl ShardManager {
373 /// Create a new shard manager (static mode).
374 pub fn new(
375 num_shards: u16,
376 ring_buffer_capacity: usize,
377 backpressure_mode: BackpressureMode,
378 ) -> Self {
379 let shards: Vec<Shard> = (0..num_shards)
380 .map(|id| Shard::new(id, ring_buffer_capacity))
381 .collect();
382
383 Self {
384 table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
385 num_shards: std::sync::atomic::AtomicU16::new(num_shards),
386 backpressure_mode,
387 ring_buffer_capacity,
388 mapper: None,
389 rebuild_lock: parking_lot::Mutex::new(()),
390 events_unrouted: AtomicU64::new(0),
391 }
392 }
393
394 /// Create a new shard manager with dynamic scaling enabled.
395 pub fn with_mapper(
396 num_shards: u16,
397 ring_buffer_capacity: usize,
398 backpressure_mode: BackpressureMode,
399 policy: ScalingPolicy,
400 ) -> Result<Self, ScalingError> {
401 let mapper = Arc::new(ShardMapper::new(num_shards, ring_buffer_capacity, policy)?);
402
403 let shards: Vec<Shard> = (0..num_shards)
404 .map(|id| {
405 let metrics = mapper.metrics_collector(id).ok_or_else(|| {
406 ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", id))
407 })?;
408 Ok(Shard::with_metrics(id, ring_buffer_capacity, metrics))
409 })
410 .collect::<Result<Vec<_>, ScalingError>>()?;
411
412 Ok(Self {
413 table: arc_swap::ArcSwap::from_pointee(ShardTable::new(shards)),
414 num_shards: std::sync::atomic::AtomicU16::new(num_shards),
415 backpressure_mode,
416 ring_buffer_capacity,
417 mapper: Some(mapper),
418 rebuild_lock: parking_lot::Mutex::new(()),
419 events_unrouted: AtomicU64::new(0),
420 })
421 }
422
423 /// Get the shard mapper (if dynamic scaling is enabled).
424 pub fn mapper(&self) -> Option<&Arc<ShardMapper>> {
425 self.mapper.as_ref()
426 }
427
428 /// Get the number of active shards.
429 #[inline]
430 pub fn num_shards(&self) -> u16 {
431 self.num_shards.load(std::sync::atomic::Ordering::Acquire)
432 }
433
434 /// Get the backpressure mode.
435 #[inline]
436 pub fn backpressure_mode(&self) -> BackpressureMode {
437 self.backpressure_mode
438 }
439
440 /// Select a shard for an event based on its content hash.
441 /// Uses weighted selection if dynamic scaling is enabled.
442 ///
443 /// **Prefer [`select_shard_by_hash`].** This method serializes the
444 /// `JsonValue` to bytes just to compute the hash; if you already
445 /// have a `RawEvent` (or any pre-computed `xxh3_64` of the
446 /// canonical bytes), pass that hash directly. The internal
447 /// ingest paths all do — this method exists for ad-hoc external
448 /// callers that haven't yet adopted the `RawEvent` pattern.
449 ///
450 /// [`select_shard_by_hash`]: Self::select_shard_by_hash
451 #[inline]
452 #[deprecated(
453 since = "0.10.0",
454 note = "serializes the value just to hash it; prefer `RawEvent::from_value(v).hash()` + `select_shard_by_hash` to avoid the duplicate serialization"
455 )]
456 pub fn select_shard(&self, event: &JsonValue) -> u16 {
457 // Use xxhash for fast, deterministic hashing. `to_vec` avoids the
458 // extra UTF-8 validation that `to_string` performs on the serialized
459 // buffer, since we only need the bytes for hashing.
460 let bytes = serde_json::to_vec(event).expect("Value serialization is infallible");
461 let hash = xxhash_rust::xxh3::xxh3_64(&bytes);
462 self.select_shard_by_hash(hash)
463 }
464
465 /// Select a shard using a pre-computed hash.
466 ///
467 /// This is faster than `select_shard` when you already have the hash.
468 #[inline]
469 pub fn select_shard_by_hash(&self, hash: u64) -> u16 {
470 if let Some(ref mapper) = self.mapper {
471 // Dynamic mode: use weighted selection
472 mapper.select_shard(hash)
473 } else {
474 // Static mode: simple modulo. Defensive guard against
475 // `num_shards == 0` — config validation rejects 0 at
476 // startup and `scale_down` requires `current > min_shards
477 // >= 1`, so this branch is unreachable today, but a stray
478 // 0 here would otherwise panic on the `%` below.
479 let num_shards = self.num_shards.load(std::sync::atomic::Ordering::Acquire);
480 debug_assert!(num_shards > 0, "num_shards must be > 0");
481 if num_shards == 0 {
482 return 0;
483 }
484 (hash % num_shards as u64) as u16
485 }
486 }
487
488 /// Resolve a shard ID to its table index, using the fast path in
489 /// static mode (shard_id == index).
490 #[inline]
491 fn resolve_idx(&self, table: &ShardTable, shard_id: u16) -> Option<usize> {
492 if self.mapper.is_none() {
493 Some(shard_id as usize)
494 } else {
495 table.shard_index.get(&shard_id).copied()
496 }
497 }
498
499 /// Push `raw` into `shard`, handling backpressure. Only clones the
500 /// bytes when `DropOldest` needs them for the retry path.
501 #[inline]
502 fn push_with_backpressure(
503 &self,
504 shard: &mut Shard,
505 shard_id: u16,
506 raw: Bytes,
507 ) -> Result<(u16, u64), IngestionError> {
508 match self.backpressure_mode {
509 BackpressureMode::DropOldest => match shard.try_push_raw(raw.clone()) {
510 Ok(ts) => Ok((shard_id, ts)),
511 Err(IngestionError::Backpressure) => {
512 // The failed try_push_raw incremented events_dropped for
513 // the *new* event, but the new event isn't actually
514 // dropped — the oldest is. Correct the stats: undo the
515 // spurious drop count, evict the oldest (which is the real
516 // drop), and retry with the same ref-counted bytes.
517 //
518 // Use the producer-side `evict_oldest` rather
519 // than `try_pop`. Calling `try_pop` from the
520 // producer thread would violate the SPSC consumer
521 // contract (the
522 // legitimate consumer is the batch worker, on a
523 // different task / thread).
524 //
525 // Transient stats note: a concurrent reader of
526 // `manager.stats().events_dropped` between the
527 // `fetch_sub` and the second `fetch_add` would
528 // briefly observe the pre-correction value
529 // (one less than reality). The net delta over
530 // the whole retry is `+1`, matching the real
531 // drop. Documented as snapshot-not-coherent
532 // per `ShardCounters::snapshot`'s contract.
533 shard
534 .counters
535 .events_dropped
536 .fetch_sub(1, AtomicOrdering::Relaxed);
537 let _ = shard.evict_oldest();
538 shard
539 .counters
540 .events_dropped
541 .fetch_add(1, AtomicOrdering::Relaxed);
542 shard.try_push_raw(raw).map(|ts| (shard_id, ts))
543 }
544 Err(e) => Err(e),
545 },
546 BackpressureMode::Sample { .. } => match shard.try_push_raw(raw) {
547 Ok(ts) => Ok((shard_id, ts)),
548 Err(IngestionError::Backpressure) => Err(IngestionError::Sampled),
549 Err(e) => Err(e),
550 },
551 BackpressureMode::DropNewest | BackpressureMode::FailProducer => {
552 shard.try_push_raw(raw).map(|ts| (shard_id, ts))
553 }
554 }
555 }
556
557 /// Ingest an event into the appropriate shard.
558 pub fn ingest(&self, event: JsonValue) -> Result<(u16, u64), IngestionError> {
559 // Serialize once upfront - avoids clone on retry
560 let raw = Bytes::from(serde_json::to_vec(&event)?);
561 let hash = xxhash_rust::xxh3::xxh3_64(&raw);
562 let shard_id = self.select_shard_by_hash(hash);
563
564 let table = self.table.load();
565 // Surface "no routable destination" as `Unrouted` (not
566 // `Backpressure`) and bump the manager-level
567 // `events_unrouted` counter so per-event vs. batch-path
568 // accounting agree. The secondary `table.shards.get(idx)`
569 // miss should be impossible by the `shard_index ↔ shards`
570 // invariant — keep returning `Unrouted` defensively rather
571 // than panicking.
572 let Some(idx) = self.resolve_idx(&table, shard_id) else {
573 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
574 return Err(IngestionError::Unrouted);
575 };
576 let Some(shard_lock) = table.shards.get(idx) else {
577 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
578 return Err(IngestionError::Unrouted);
579 };
580
581 let mut shard = shard_lock.lock();
582 self.push_with_backpressure(&mut shard, shard_id, raw)
583 }
584
585 /// Ingest a raw event (pre-serialized with cached hash).
586 ///
587 /// This is the fastest ingestion path:
588 /// - Uses pre-computed hash for shard selection (no serialization)
589 /// - Stores bytes directly (no clone needed, reference-counted)
590 #[inline]
591 pub fn ingest_raw(&self, event: RawEvent) -> Result<(u16, u64), IngestionError> {
592 let shard_id = self.select_shard_by_hash(event.hash());
593
594 let table = self.table.load();
595 // See `ingest` above for the `Unrouted` rationale.
596 let Some(idx) = self.resolve_idx(&table, shard_id) else {
597 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
598 return Err(IngestionError::Unrouted);
599 };
600 let Some(shard_lock) = table.shards.get(idx) else {
601 self.events_unrouted.fetch_add(1, AtomicOrdering::Relaxed);
602 return Err(IngestionError::Unrouted);
603 };
604
605 let mut shard = shard_lock.lock();
606 self.push_with_backpressure(&mut shard, shard_id, event.bytes())
607 }
608
609 /// Ingest a batch of pre-serialized events, grouped by shard.
610 ///
611 /// Each destination shard's mutex is acquired once and all of that
612 /// shard's events are pushed before releasing. With a uniform hash
613 /// distribution this amortizes lock acquisitions from O(events) to
614 /// O(shards). Backpressure semantics match per-event `ingest_raw`.
615 ///
616 /// Returns `(success, unrouted)` where `success` is the count of
617 /// events successfully pushed onto a shard's ring buffer and
618 /// `unrouted` is the count of events whose destination shard was
619 /// not present in the routing table at the time of dispatch
620 /// (e.g. concurrent scale-down). The remainder
621 /// (`total - success - unrouted`) is the backpressure-class drop
622 /// count.
623 ///
624 /// Returns `(success, unrouted)` rather than just `success`
625 /// so the bus can subtract `unrouted` before publishing
626 /// `events_dropped`. Returning only `success` would let the
627 /// bus's `dropped = total - success` accounting double-count
628 /// unrouted events — they're already tallied on
629 /// `events_unrouted` inside this function.
630 pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> (usize, usize) {
631 if events.is_empty() {
632 return (0, 0);
633 }
634
635 let table = self.table.load();
636
637 // Bucket by table index. Using a Vec<Vec<_>> keyed by index is
638 // cheaper than a HashMap for the common case of a small
639 // shard count.
640 let mut groups: Vec<Vec<Bytes>> = (0..table.shards.len()).map(|_| Vec::new()).collect();
641 let mut group_ids: Vec<u16> = vec![0; groups.len()];
642
643 let mut unrouted = 0usize;
644 for event in events {
645 let shard_id = self.select_shard_by_hash(event.hash());
646 let Some(idx) = self.resolve_idx(&table, shard_id) else {
647 // Routing table doesn't contain the chosen shard
648 // (e.g. concurrent scale-down removed it). The drop
649 // can't be attributed to a per-shard counter; track
650 // it on the manager-level `events_unrouted` so
651 // bus-level vs. per-shard reconciliation is exact.
652 unrouted += 1;
653 continue;
654 };
655 if let Some(g) = groups.get_mut(idx) {
656 if g.is_empty() {
657 group_ids[idx] = shard_id;
658 }
659 g.push(event.bytes());
660 }
661 }
662 if unrouted > 0 {
663 self.events_unrouted
664 .fetch_add(unrouted as u64, AtomicOrdering::Relaxed);
665 }
666
667 let mut success = 0usize;
668 for (idx, group) in groups.into_iter().enumerate() {
669 if group.is_empty() {
670 continue;
671 }
672 let shard_id = group_ids[idx];
673 let Some(shard_lock) = table.shards.get(idx) else {
674 continue;
675 };
676 let mut shard = shard_lock.lock();
677 for bytes in group {
678 if self
679 .push_with_backpressure(&mut shard, shard_id, bytes)
680 .is_ok()
681 {
682 success += 1;
683 }
684 }
685 }
686
687 (success, unrouted)
688 }
689
690 /// Get a reference to a shard by ID.
691 pub fn shard(&self, id: u16) -> Option<ShardRef> {
692 let table = self.table.load();
693 let idx = self.resolve_idx(&table, id)?;
694 let shard = table.shards.get(idx)?.clone();
695 Some(ShardRef { shard })
696 }
697
698 /// Execute a function with exclusive access to a shard.
699 pub fn with_shard<F, R>(&self, id: u16, f: F) -> Option<R>
700 where
701 F: FnOnce(&mut Shard) -> R,
702 {
703 let table = self.table.load();
704 let idx = self.resolve_idx(&table, id)?;
705 table.shards.get(idx).map(|shard_lock| {
706 let mut shard = shard_lock.lock();
707 f(&mut shard)
708 })
709 }
710
711 /// Returns true if every shard's ring buffer is empty.
712 ///
713 /// Cheaper than `shard_ids()` + repeated `with_shard`: loads the
714 /// routing table once and checks each shard behind a brief lock.
715 pub fn all_shards_empty(&self) -> bool {
716 let table = self.table.load();
717 table.shards.iter().all(|s| s.lock().is_empty())
718 }
719
720 /// Iterate over all active shard IDs.
721 pub fn shard_ids(&self) -> Vec<u16> {
722 self.table.load().shard_index.keys().copied().collect()
723 }
724
725 /// Sum of `len()` across every shard's ring buffer.
726 pub fn total_pending_in_rings(&self) -> u64 {
727 let table = self.table.load();
728 table.shards.iter().map(|s| s.lock().len() as u64).sum()
729 }
730
731 /// Best-effort variant of [`Self::total_pending_in_rings`] that
732 /// never blocks: every shard whose mutex is currently held is
733 /// skipped (counted as zero). Use this from `Drop` or any path
734 /// that may run on a thread already holding a shard lock
735 /// (single-thread runtime + panic during shutdown is the
736 /// canonical hazard); the blocking variant would self-deadlock
737 /// there.
738 ///
739 /// Returns `(sum_counted, uncounted_shard_count)` so the caller
740 /// can log the uncertainty in the result.
741 pub fn try_total_pending_in_rings(&self) -> (u64, usize) {
742 let table = self.table.load();
743 let mut sum: u64 = 0;
744 let mut uncounted: usize = 0;
745 for s in table.shards.iter() {
746 match s.try_lock() {
747 Some(guard) => sum += guard.len() as u64,
748 None => uncounted += 1,
749 }
750 }
751 (sum, uncounted)
752 }
753
754 /// Get aggregated statistics from all shards.
755 ///
756 /// Lock-free: reads each shard's atomic counters directly via the
757 /// parallel `counters` vector on the routing table, with no per-
758 /// shard mutex acquisition. `events_unrouted` is sourced from the
759 /// `ShardManager` itself rather than the per-shard counters since
760 /// unrouted events have no shard to attribute to.
761 pub fn stats(&self) -> ShardStats {
762 let table = self.table.load();
763 let mut total = ShardStats::default();
764 for counters in table.counters.iter() {
765 let snap = counters.snapshot();
766 total.events_ingested += snap.events_ingested;
767 total.events_dropped += snap.events_dropped;
768 total.batches_dispatched += snap.batches_dispatched;
769 }
770 total.events_unrouted = self.events_unrouted.load(AtomicOrdering::Relaxed);
771 total
772 }
773
774 /// Rebuild the routing table with a closure that sees the old
775 /// `(shards, counters, shard_index)` and produces the new ones.
776 /// Serialized by `rebuild_lock` so concurrent scaling operations
777 /// can't race on read-modify-write of the table.
778 fn rebuild_table<F>(&self, f: F)
779 where
780 F: FnOnce(
781 &Vec<Arc<parking_lot::Mutex<Shard>>>,
782 &Vec<Arc<ShardCounters>>,
783 &std::collections::HashMap<u16, usize>,
784 ) -> ShardTable,
785 {
786 let _guard = self.rebuild_lock.lock();
787 let old = self.table.load();
788 let new = f(&old.shards, &old.counters, &old.shard_index);
789 self.table.store(Arc::new(new));
790 }
791
792 /// Add a new shard (for dynamic scaling).
793 /// Returns the new shard ID. The shard is in the routing table
794 /// and ready to be the destination of `select_shard` calls
795 /// **only after** [`activate_shard`] is called for it.
796 ///
797 /// Previously the mapper marked the shard `Active` *before* the
798 /// routing table was rebuilt and *before* any worker was wired up
799 /// to drain its ring buffer. Producers could `select_shard` to
800 /// the new id, push into its ring buffer, and have the events
801 /// stranded with no consumer. The fix uses
802 /// `scale_up_provisioning` so the mapper records the shard but
803 /// `select_shard` skips it, then `activate_shard` flips it to
804 /// `Active` once workers are ready.
805 ///
806 /// [`activate_shard`]: Self::activate_shard
807 pub fn add_shard(&self) -> Result<u16, ScalingError> {
808 self.add_shard_inner(false)
809 }
810
811 /// Like [`add_shard`] but bypasses the auto-scaling cooldown.
812 ///
813 /// Used by operator-initiated `manual_scale_up` paths. The
814 /// auto-scaling cooldown protects against the auto-scaling
815 /// monitor reacting too quickly to transient load spikes;
816 /// a deliberate operator action should not be rate-limited
817 /// by that cadence. The `max_shards` budget check still
818 /// applies.
819 ///
820 /// [`add_shard`]: Self::add_shard
821 pub fn add_shard_force(&self) -> Result<u16, ScalingError> {
822 self.add_shard_inner(true)
823 }
824
825 fn add_shard_inner(&self, force: bool) -> Result<u16, ScalingError> {
826 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
827 "Dynamic scaling not enabled".into(),
828 ))?;
829
830 // Allocate the shard in `Provisioning` state — not yet
831 // selectable.
832 let new_ids = if force {
833 mapper.scale_up_provisioning_force(1)?
834 } else {
835 mapper.scale_up_provisioning(1)?
836 };
837 let new_id = new_ids[0];
838
839 let metrics = mapper.metrics_collector(new_id).ok_or_else(|| {
840 ScalingError::InvalidPolicy(format!("no metrics collector for shard {}", new_id))
841 })?;
842 let new_shard = Shard::with_metrics(new_id, self.ring_buffer_capacity, metrics);
843 let new_counters = new_shard.counters();
844 let new_shard = Arc::new(parking_lot::Mutex::new(new_shard));
845
846 // Publish to the routing table so `with_shard` works (the
847 // drain worker the caller is about to spawn needs this) but
848 // the shard is still `Provisioning` so `select_shard` will
849 // not route producer pushes to it yet.
850 self.rebuild_table(|shards, counters, shard_index| {
851 let mut shards = shards.clone();
852 let mut counters = counters.clone();
853 let mut shard_index = shard_index.clone();
854 let idx = shards.len();
855 shards.push(new_shard.clone());
856 counters.push(new_counters.clone());
857 shard_index.insert(new_id, idx);
858 ShardTable {
859 shards,
860 counters,
861 shard_index,
862 }
863 });
864
865 // Don't bump `num_shards` yet — `activate_shard` does that
866 // when the shard becomes selectable.
867 Ok(new_id)
868 }
869
870 /// Activate a previously-provisioned shard. After this returns,
871 /// `select_shard` will route to the shard and producer pushes
872 /// will land in its ring buffer.
873 ///
874 /// Idempotent: calling on an already-`Active` shard is `Ok(())`.
875 ///
876 /// Pre-fix this unconditionally `fetch_add(1)`d
877 /// `num_shards` even when the mapper's `activate()` early-
878 /// returned for an already-`Active` shard. After repeated
879 /// activate calls, `num_shards` exceeded both the mapper's
880 /// `active_count` and the actual shard count, breaking
881 /// modulo-based shard selection (`select_shard`) and
882 /// producing stale routing decisions. Post-fix gates the
883 /// `fetch_add` on the mapper's transition signal.
884 pub fn activate_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
885 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
886 "Dynamic scaling not enabled".into(),
887 ))?;
888 let transitioned = mapper.activate(shard_id)?;
889 if transitioned {
890 self.num_shards
891 .fetch_add(1, std::sync::atomic::Ordering::Release);
892 }
893 Ok(())
894 }
895
896 /// Start draining a shard (for dynamic scaling).
897 ///
898 /// Previously only flipped the metrics collector's `draining`
899 /// atomic, leaving `MappedShard.state` untouched. Result:
900 /// `select_shard` (which filters on `state == Active`) still
901 /// routed new producers to the shard. The fix calls into the
902 /// mapper, which atomically transitions the state to `Draining`
903 /// and (for accounting) decrements `active_count`, mirroring
904 /// `scale_down(N)` for a single targeted shard.
905 pub fn drain_shard(&self, shard_id: u16) -> Result<(), ScalingError> {
906 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
907 "Dynamic scaling not enabled".into(),
908 ))?;
909 mapper.drain_specific(shard_id)
910 }
911
912 /// Remove a shard from the routing table.
913 ///
914 /// Previously this only unmapped the shard from the routing
915 /// table. The drain worker, on its next `with_shard` call,
916 /// observed `None` and exited — leaving any events still in the
917 /// ring buffer permanently stranded. The fix drains the ring
918 /// buffer into a caller-supplied scratch `Vec` **before** the
919 /// unmap, then returns the drained events so the caller
920 /// (typically `EventBus::remove_shard_internal`) can flush them
921 /// through to the adapter rather than dropping them.
922 ///
923 /// Returns `Ok(events)` where `events` is whatever was still
924 /// queued in the ring buffer at unmap time (possibly empty).
925 /// Caller is responsible for handing those off to the adapter.
926 pub fn remove_shard(
927 &self,
928 shard_id: u16,
929 ) -> Result<Vec<crate::event::InternalEvent>, ScalingError> {
930 let mapper = self.mapper.as_ref().ok_or(ScalingError::InvalidPolicy(
931 "Dynamic scaling not enabled".into(),
932 ))?;
933
934 // Capture the mapper-side state *before* we unmap. This
935 // gates the `num_shards` decrement at the end so it stays
936 // symmetric with `activate_shard`'s `fetch_add`. The
937 // activate-failure rollback path (`bus.rs`) calls us on a
938 // shard that's still `Provisioning` — `add_shard` never
939 // bumped `num_shards` for it, so an unconditional
940 // `fetch_sub` here would leave the counter one below the
941 // table's actual size, breaking modulo-based shard
942 // selection. `Active` / `Draining` / `Stopped` shards all
943 // had `activate_shard` succeed against them at some point
944 // (it's the only way out of `Provisioning`), so they did
945 // bump `num_shards` and must decrement here.
946 let was_activated = matches!(
947 mapper.shard_state(shard_id),
948 Some(ShardState::Active) | Some(ShardState::Draining) | Some(ShardState::Stopped)
949 );
950
951 // Drain whatever is left in the ring buffer before unmapping.
952 // `with_shard` returns `None` once the shard is gone, so we
953 // do this *before* `rebuild_table`. We cap drain to a sane
954 // upper bound (`ring_buffer_capacity`) so a malformed shard
955 // can't pin us here forever.
956 let cap = self.ring_buffer_capacity;
957 let drained: Vec<crate::event::InternalEvent> = self
958 .with_shard(shard_id, |shard| {
959 let mut buf = Vec::with_capacity(shard.len().min(cap));
960 shard.pop_batch_into(&mut buf, cap);
961 buf
962 })
963 .unwrap_or_default();
964
965 let mut removed = false;
966 self.rebuild_table(|shards, counters, shard_index| {
967 let mut shards = shards.clone();
968 let mut counters = counters.clone();
969 let mut shard_index = shard_index.clone();
970
971 if let Some(idx) = shard_index.remove(&shard_id) {
972 removed = true;
973 shards.swap_remove(idx);
974 counters.swap_remove(idx);
975 // swap_remove moved the last element into `idx`: update its
976 // index mapping.
977 if idx < shards.len() {
978 let moved_shard_id = shards[idx].lock().id;
979 shard_index.insert(moved_shard_id, idx);
980 }
981 }
982
983 ShardTable {
984 shards,
985 counters,
986 shard_index,
987 }
988 });
989
990 if removed && was_activated {
991 self.num_shards
992 .fetch_sub(1, std::sync::atomic::Ordering::Release);
993 }
994
995 // Ask the mapper to drop the corresponding `MappedShard`
996 // record. Without this sweep the mapper's
997 // `shards: RwLock<Vec<MappedShard>>` would keep growing
998 // across scale-up/down cycles (every scale-up appends a
999 // fresh entry; `Stopped` entries are only removed by an
1000 // explicit `remove_specific_stopped_shard` /
1001 // `remove_stopped_shards` call). `evaluate_scaling`
1002 // filters by state but still iterates the full list, so
1003 // per-tick cost would grow with cumulative scaling history.
1004 //
1005 // The scaling monitor calls `mapper.finalize_draining()`
1006 // before invoking `bus.remove_shard_internal(id)` (which is
1007 // what calls us), so by the time we run the matching
1008 // `MappedShard` is already in `Stopped` state. We prune
1009 // ONLY this shard here, not every Stopped one — a bulk
1010 // sweep would prune sibling Stopped shards that a
1011 // sequential `manual_scale_down` is about to look up
1012 // state for in its next iteration's `remove_shard`. Once
1013 // the mapper had `None` for a sibling shard, the
1014 // `was_activated` gate above would observe it as
1015 // never-activated and skip the `num_shards` decrement,
1016 // leaving the counter one below the actual table size.
1017 mapper.remove_specific_stopped_shard(shard_id);
1018
1019 Ok(drained)
1020 }
1021
1022 /// Collect metrics from all shards (for dynamic scaling decisions).
1023 pub fn collect_metrics(&self) -> Option<Vec<ShardMetrics>> {
1024 self.mapper.as_ref().map(|m| m.collect_metrics())
1025 }
1026
1027 /// Evaluate and optionally execute scaling.
1028 pub fn evaluate_scaling(&self) -> ScalingDecision {
1029 self.mapper
1030 .as_ref()
1031 .map(|m| m.evaluate_scaling())
1032 .unwrap_or(ScalingDecision::None)
1033 }
1034}
1035
1036/// An owned handle to a shard. Holding this does not block scaling
1037/// operations; the shard stays alive via `Arc` refcount even if
1038/// removed from the table.
1039pub struct ShardRef {
1040 shard: Arc<parking_lot::Mutex<Shard>>,
1041}
1042
1043impl ShardRef {
1044 /// Lock the shard for exclusive access.
1045 pub fn lock(&self) -> parking_lot::MutexGuard<'_, Shard> {
1046 self.shard.lock()
1047 }
1048}
1049
1050#[cfg(test)]
1051mod tests {
1052 use super::*;
1053 use serde_json::json;
1054
1055 #[test]
1056 fn test_shard_push_pop() {
1057 let mut shard = Shard::new(0, 1024);
1058
1059 let ts = shard.try_push(json!({"test": 1})).unwrap();
1060 assert!(ts > 0);
1061 assert_eq!(shard.len(), 1);
1062
1063 let event = shard.try_pop().unwrap();
1064 assert_eq!(event.shard_id, 0);
1065 assert_eq!(event.insertion_ts, ts);
1066 assert!(shard.is_empty());
1067 }
1068
1069 /// A `Shard` configured with a `ShardMetricsCollector` must feed every
1070 /// successful push into the collector so the dynamic-scaling and
1071 /// drain-finalize paths see non-zero counters. Without this wiring
1072 /// `evaluate_scaling` reads `fill_ratio == 0` for every shard and
1073 /// `finalize_draining`'s "is the ring actually empty" predicate is a
1074 /// no-op (it sees `pushes_since_drain_start == 0` regardless of
1075 /// contents).
1076 #[test]
1077 fn try_push_feeds_metrics_collector() {
1078 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1079 let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1080
1081 for i in 0..16 {
1082 shard.try_push(json!({"i": i})).unwrap();
1083 }
1084
1085 let metrics = collector.collect_and_reset();
1086 assert_eq!(
1087 metrics.event_rate, 16,
1088 "every push must increment event_rate"
1089 );
1090 assert!(metrics.fill_ratio > 0.0, "buffer length must be observable");
1091 assert!(
1092 metrics.avg_push_latency_ns > 0,
1093 "push latency must be recorded"
1094 );
1095 }
1096
1097 /// `try_total_pending_in_rings` must never block, must skip
1098 /// shards whose mutex is currently held, and must report how
1099 /// many it skipped. This is what makes `EventBus::Drop`
1100 /// safe to call on a thread that already holds a shard lock.
1101 #[test]
1102 fn try_total_pending_in_rings_skips_held_shards() {
1103 let manager = ShardManager::new(2, 1024, BackpressureMode::DropNewest);
1104 // Push some events so a non-zero count is observable.
1105 manager.ingest(json!({"i": 1})).unwrap();
1106 manager.ingest(json!({"i": 2})).unwrap();
1107 manager.ingest(json!({"i": 3})).unwrap();
1108
1109 // Uncontended: all shards counted, uncounted_shards == 0.
1110 let (sum, uncounted) = manager.try_total_pending_in_rings();
1111 assert_eq!(uncounted, 0);
1112 let baseline_sum = sum;
1113 assert!(baseline_sum > 0, "events should be pending in some shard");
1114
1115 // Hold one shard's mutex and re-check: that shard must be
1116 // skipped, uncounted must be 1, and the call must return
1117 // immediately (this test would hang on the blocking
1118 // `total_pending_in_rings` variant).
1119 let table = manager.table.load();
1120 let _guard = table.shards[0].lock();
1121 let (sum2, uncounted2) = manager.try_total_pending_in_rings();
1122 assert_eq!(uncounted2, 1, "the locked shard must be uncounted");
1123 assert!(
1124 sum2 <= baseline_sum,
1125 "sum must not include events from the locked shard"
1126 );
1127 }
1128
1129 /// Same wiring for `try_push_raw` — the byte-oriented hot path.
1130 #[test]
1131 fn try_push_raw_feeds_metrics_collector() {
1132 let collector = Arc::new(ShardMetricsCollector::new(0, 1024));
1133 let mut shard = Shard::with_metrics(0, 1024, Arc::clone(&collector));
1134
1135 for i in 0..16 {
1136 shard
1137 .try_push_raw(bytes::Bytes::from(format!("event-{i}")))
1138 .unwrap();
1139 }
1140
1141 let metrics = collector.collect_and_reset();
1142 assert_eq!(metrics.event_rate, 16);
1143 assert!(metrics.fill_ratio > 0.0);
1144 assert!(metrics.avg_push_latency_ns > 0);
1145 }
1146
1147 #[test]
1148 #[allow(deprecated)] // exercises the deprecated `select_shard` path
1149 fn test_shard_manager_routing() {
1150 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1151
1152 // Same event should always go to the same shard
1153 let event = json!({"key": "value"});
1154 let shard1 = manager.select_shard(&event);
1155 let shard2 = manager.select_shard(&event);
1156 assert_eq!(shard1, shard2);
1157
1158 // Different events may go to different shards
1159 let events: Vec<_> = (0..100).map(|i| json!({"i": i})).collect();
1160 let shards: std::collections::HashSet<_> =
1161 events.iter().map(|e| manager.select_shard(e)).collect();
1162
1163 // With 100 random events and 4 shards, we should hit multiple shards
1164 assert!(shards.len() > 1);
1165 }
1166
1167 /// Regression: the deprecated `select_shard(&JsonValue)` must produce
1168 /// the same shard id as `select_shard_by_hash` would for the
1169 /// equivalent `RawEvent`. They share underlying logic now, but if a
1170 /// future refactor splits them this test catches the divergence
1171 /// before consumers do.
1172 #[test]
1173 #[allow(deprecated)]
1174 fn test_select_shard_matches_select_shard_by_hash() {
1175 let manager = ShardManager::new(8, 1024, BackpressureMode::DropNewest);
1176 for i in 0..200 {
1177 let v = json!({"i": i, "tag": format!("user-{i}")});
1178 let raw = RawEvent::from_value(v.clone());
1179 assert_eq!(
1180 manager.select_shard(&v),
1181 manager.select_shard_by_hash(raw.hash()),
1182 "select_shard and select_shard_by_hash must agree (i={i})"
1183 );
1184 }
1185 }
1186
1187 #[test]
1188 fn test_shard_manager_ingest() {
1189 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1190
1191 for i in 0..100 {
1192 let event = json!({"i": i});
1193 let result = manager.ingest(event);
1194 assert!(result.is_ok());
1195 }
1196
1197 let stats = manager.stats();
1198 assert_eq!(stats.events_ingested, 100);
1199 assert_eq!(stats.events_dropped, 0);
1200 }
1201
1202 #[test]
1203 fn test_backpressure_drop_newest() {
1204 let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1205
1206 // Fill the buffer (capacity 4, usable 3)
1207 for i in 0..3 {
1208 manager.ingest(json!({"i": i})).unwrap();
1209 }
1210
1211 // Next insert should fail
1212 let result = manager.ingest(json!({"i": 999}));
1213 assert!(matches!(result, Err(IngestionError::Backpressure)));
1214
1215 let stats = manager.stats();
1216 assert_eq!(stats.events_ingested, 3);
1217 assert_eq!(stats.events_dropped, 1);
1218 }
1219
1220 #[test]
1221 fn test_backpressure_drop_oldest() {
1222 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1223
1224 // Fill the buffer
1225 for i in 0..3 {
1226 manager.ingest(json!({"i": i})).unwrap();
1227 }
1228
1229 // Next insert should succeed by dropping oldest
1230 let result = manager.ingest(json!({"i": 999}));
1231 assert!(result.is_ok());
1232
1233 // Verify the oldest was dropped
1234 let shard = manager.shard(0).unwrap();
1235 let events = shard.lock().pop_batch(10);
1236
1237 // Should have events 1, 2, 999 (0 was dropped)
1238 assert_eq!(events.len(), 3);
1239 assert_eq!(events[0].parse().unwrap(), json!({"i": 1}));
1240 assert_eq!(events[2].parse().unwrap(), json!({"i": 999}));
1241 }
1242
1243 #[test]
1244 fn test_raw_event_ingestion() {
1245 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1246
1247 for i in 0..100 {
1248 let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1249 let result = manager.ingest_raw(raw);
1250 assert!(result.is_ok());
1251 }
1252
1253 let stats = manager.stats();
1254 assert_eq!(stats.events_ingested, 100);
1255 assert_eq!(stats.events_dropped, 0);
1256 }
1257
1258 /// `ingest_raw_batch` groups events by destination shard before
1259 /// pushing — verify the grouping preserves FIFO within a shard,
1260 /// honors hash-based routing, and that totals match `ingest_raw`.
1261 #[test]
1262 fn test_ingest_raw_batch_routes_and_preserves_order() {
1263 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1264 let events: Vec<RawEvent> = (0..200)
1265 .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1266 .collect();
1267
1268 // Snapshot the expected destination for each event so we can
1269 // compare against what actually landed in each shard.
1270 let expected_dests: Vec<u16> = events
1271 .iter()
1272 .map(|e| manager.select_shard_by_hash(e.hash()))
1273 .collect();
1274
1275 let (success, unrouted) = manager.ingest_raw_batch(events.clone());
1276 assert_eq!(success, 200, "all events should land with ample capacity");
1277 assert_eq!(unrouted, 0, "no scale-down so no unrouted events");
1278
1279 // Aggregate totals must match.
1280 let stats = manager.stats();
1281 assert_eq!(stats.events_ingested, 200);
1282 assert_eq!(stats.events_dropped, 0);
1283
1284 // Per-shard totals must match the expected routing distribution,
1285 // and the distribution must span more than one shard (otherwise
1286 // the test wouldn't exercise the grouping path).
1287 let mut expected_by_shard: std::collections::HashMap<u16, u64> =
1288 std::collections::HashMap::new();
1289 for d in &expected_dests {
1290 *expected_by_shard.entry(*d).or_default() += 1;
1291 }
1292 assert!(
1293 expected_by_shard.len() > 1,
1294 "hash distribution should span multiple shards"
1295 );
1296 for shard_id in 0..4u16 {
1297 let got = manager
1298 .with_shard(shard_id, |s| s.stats().events_ingested)
1299 .unwrap();
1300 let want = expected_by_shard.get(&shard_id).copied().unwrap_or(0);
1301 assert_eq!(got, want, "shard {} ingested count mismatch", shard_id);
1302 }
1303
1304 // FIFO within a shard: the events a shard received, in the order
1305 // we batched them, must come out of the ring buffer in the same
1306 // order.
1307 for shard_id in 0..4u16 {
1308 let expected_payloads: Vec<&[u8]> = events
1309 .iter()
1310 .zip(expected_dests.iter())
1311 .filter(|(_, d)| **d == shard_id)
1312 .map(|(e, _)| e.as_bytes())
1313 .collect();
1314 let popped = manager.with_shard(shard_id, |s| s.pop_batch(1024)).unwrap();
1315 assert_eq!(popped.len(), expected_payloads.len());
1316 for (i, ev) in popped.iter().enumerate() {
1317 assert_eq!(
1318 ev.as_bytes(),
1319 expected_payloads[i],
1320 "shard {} position {} out of order",
1321 shard_id,
1322 i
1323 );
1324 }
1325 }
1326 }
1327
1328 /// Batching past a shard's capacity must account every dropped
1329 /// event under `DropNewest`: `success` + `events_dropped` =
1330 /// `len(input)`.
1331 #[test]
1332 fn test_ingest_raw_batch_drop_accounting() {
1333 // Single shard, usable capacity 3 (ring buffer reserves one slot).
1334 let manager = ShardManager::new(1, 4, BackpressureMode::DropNewest);
1335 let events: Vec<RawEvent> = (0..10)
1336 .map(|i| RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)))
1337 .collect();
1338
1339 let (success, unrouted) = manager.ingest_raw_batch(events);
1340 assert_eq!(success, 3, "only 3 should fit under DropNewest");
1341 assert_eq!(unrouted, 0, "single-shard config has no unrouted events");
1342
1343 let stats = manager.stats();
1344 assert_eq!(stats.events_ingested, 3);
1345 assert_eq!(stats.events_dropped, 7);
1346 }
1347
1348 /// Empty batch is a no-op and must not touch stats.
1349 #[test]
1350 fn test_ingest_raw_batch_empty() {
1351 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1352 assert_eq!(manager.ingest_raw_batch(Vec::new()), (0, 0));
1353 let stats = manager.stats();
1354 assert_eq!(stats.events_ingested, 0);
1355 assert_eq!(stats.events_dropped, 0);
1356 }
1357
1358 #[test]
1359 fn test_remove_shard_requires_dynamic_scaling() {
1360 // Static mode - no dynamic scaling
1361 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1362
1363 // Should fail because dynamic scaling is not enabled
1364 let result = manager.remove_shard(0);
1365 assert!(result.is_err());
1366 assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1367 }
1368
1369 #[test]
1370 fn test_add_shard_requires_dynamic_scaling() {
1371 // Static mode - no dynamic scaling
1372 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1373
1374 // Should fail because dynamic scaling is not enabled
1375 let result = manager.add_shard();
1376 assert!(result.is_err());
1377 assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1378 }
1379
1380 #[test]
1381 fn test_drain_shard_requires_dynamic_scaling() {
1382 // Static mode - no dynamic scaling
1383 let manager = ShardManager::new(4, 1024, BackpressureMode::DropNewest);
1384
1385 // Should fail because dynamic scaling is not enabled
1386 let result = manager.drain_shard(0);
1387 assert!(result.is_err());
1388 assert!(matches!(result, Err(ScalingError::InvalidPolicy(_))));
1389 }
1390
1391 #[test]
1392 fn test_drop_oldest_counts_dropped_events() {
1393 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1394
1395 // Fill the buffer (capacity 4, usable 3)
1396 for i in 0..3 {
1397 manager.ingest(json!({"i": i})).unwrap();
1398 }
1399
1400 // This should succeed by dropping the oldest event
1401 manager.ingest(json!({"i": 999})).unwrap();
1402
1403 let stats = manager.stats();
1404 assert_eq!(stats.events_ingested, 4);
1405 // The initial push fails (counted as dropped), then retry succeeds
1406 assert_eq!(
1407 stats.events_dropped, 1,
1408 "DropOldest cycle should count exactly one drop"
1409 );
1410 }
1411
1412 #[test]
1413 fn test_drop_oldest_raw_counts_dropped_events() {
1414 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1415
1416 // Fill the buffer
1417 for i in 0..3 {
1418 let raw = RawEvent::from_str(&format!(r#"{{"i": {}}}"#, i));
1419 manager.ingest_raw(raw).unwrap();
1420 }
1421
1422 // This should succeed by dropping the oldest event
1423 let raw = RawEvent::from_str(r#"{"i": 999}"#);
1424 manager.ingest_raw(raw).unwrap();
1425
1426 let stats = manager.stats();
1427 assert_eq!(stats.events_ingested, 4);
1428 assert_eq!(
1429 stats.events_dropped, 1,
1430 "DropOldest cycle should count exactly one drop"
1431 );
1432 }
1433
1434 /// Pin the current contract for `BackpressureMode::Sample`:
1435 /// it returns `IngestionError::Sampled` once the buffer fills,
1436 /// indistinguishable in shape from a `Backpressure` rejection.
1437 /// Sampling itself ("keep 1 in N events") is **not implemented**
1438 /// — the comments in `ingest` / `ingest_raw` defer it to "a
1439 /// higher level" that does not exist. A consumer setting this
1440 /// mode today gets a rejection signal, never probabilistic
1441 /// admission.
1442 ///
1443 /// This test pins that contract so it cannot quietly change
1444 /// without an explicit decision. If sampling is ever wired up,
1445 /// this test will fail and force an update — at which point
1446 /// the implementer should also add coverage for the
1447 /// rate-proportional admission rate.
1448 #[test]
1449 fn sample_mode_currently_returns_sampled_after_buffer_fills() {
1450 // TODO(coverage round 2): `BackpressureMode::Sample` is
1451 // dead-on-arrival until "higher level" sampling lands;
1452 // see comments at `ShardManager::ingest` / `ingest_raw`.
1453 let manager = ShardManager::new(1, 4, BackpressureMode::Sample { rate: 2 });
1454
1455 // Fill the buffer (capacity 4, usable 3).
1456 for i in 0..3 {
1457 manager.ingest(json!({"i": i})).unwrap();
1458 }
1459
1460 // Both ingest paths must report `Sampled` — not `Backpressure`,
1461 // not `Ok` — so callers can distinguish the (currently
1462 // unused) sampling rejection from a hard backpressure
1463 // rejection in case sampling is wired up later.
1464 let json_result = manager.ingest(json!({"i": 999}));
1465 assert!(
1466 matches!(json_result, Err(IngestionError::Sampled)),
1467 "Sample mode must return Sampled on a full buffer (got {:?})",
1468 json_result
1469 );
1470
1471 let raw_result = manager.ingest_raw(RawEvent::from_str(r#"{"i": 999}"#));
1472 assert!(
1473 matches!(raw_result, Err(IngestionError::Sampled)),
1474 "Sample mode must return Sampled on a full buffer via ingest_raw (got {:?})",
1475 raw_result
1476 );
1477 }
1478
1479 #[test]
1480 fn test_drop_oldest_multiple_cycles() {
1481 let manager = ShardManager::new(1, 4, BackpressureMode::DropOldest);
1482
1483 // Fill the buffer (usable capacity 3)
1484 for i in 0..3 {
1485 manager.ingest(json!({"i": i})).unwrap();
1486 }
1487
1488 // Push 5 more events, each triggers a DropOldest cycle
1489 for i in 3..8 {
1490 manager.ingest(json!({"i": i})).unwrap();
1491 }
1492
1493 let stats = manager.stats();
1494 assert_eq!(stats.events_ingested, 8);
1495 assert_eq!(
1496 stats.events_dropped, 5,
1497 "each DropOldest cycle should count one drop"
1498 );
1499 }
1500
1501 /// Regression: BUG_REPORT.md #44 — single-event ingest paths
1502 /// (`ingest`, `ingest_raw`) used to collapse "shard not in
1503 /// routing table" into `IngestionError::Backpressure` and never
1504 /// touch `events_unrouted`. The batch path correctly bumped the
1505 /// counter. Reconciliation drifts because of this divergence.
1506 ///
1507 /// We construct the routing miss by:
1508 /// 1. Building a dynamic-mode manager with 2 shards.
1509 /// 2. Calling `add_shard()` which (per the #46 fix) leaves the
1510 /// shard in `Provisioning` state — present in the mapper
1511 /// but not in `select_shard`'s output.
1512 /// 3. Then directly forcing `select_shard_by_hash` would still
1513 /// return an Active shard, so we exercise the secondary
1514 /// routing-table-miss path: remove a shard and have a
1515 /// stale hash-derived id.
1516 ///
1517 /// The simpler robust check: drain every shard via
1518 /// `drain_specific` until none Active. The mapper's fallback
1519 /// now returns `u16::MAX`, which is never in the routing
1520 /// table, so `resolve_idx` misses and we should see `Unrouted`
1521 /// + counter bump.
1522 #[test]
1523 fn ingest_single_event_unrouted_increments_counter() {
1524 use crate::config::ScalingPolicy;
1525 // min_shards=1 so we can drain N-1 of N shards; the last
1526 // one we skip-mark as Draining via Stopped → drain via
1527 // scale_down then verify routing miss for the still-active
1528 // shard's hash.
1529 let policy = ScalingPolicy {
1530 min_shards: 1,
1531 max_shards: 8,
1532 cooldown: std::time::Duration::from_nanos(1),
1533 ..Default::default()
1534 };
1535 let manager =
1536 ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1537
1538 // Drain 1 of 2 shards via the public API.
1539 let mapper = manager.mapper().unwrap().clone();
1540 let _ = mapper.scale_down(1).unwrap();
1541
1542 // Find a hash that routes to the *drained* shard (the one
1543 // not in `active_shard_ids`). With weighted selection and
1544 // only one Active shard, `select_shard` always returns the
1545 // Active one, so we can't easily target the drained shard
1546 // through hash routing — what we *can* do is verify the
1547 // Active shard still routes correctly (no false positives).
1548 let active_ids = mapper.active_shard_ids();
1549 assert_eq!(active_ids.len(), 1);
1550 let active = active_ids[0];
1551
1552 // ingest a few events; all should land on the Active shard,
1553 // none should hit Unrouted.
1554 for i in 0..5 {
1555 let r = manager.ingest_raw(RawEvent::from_str(&format!(r#"{{"i":{}}}"#, i)));
1556 let (sid, _) = r.expect("active shard must accept ingest");
1557 assert_eq!(sid, active, "must route to the active shard");
1558 }
1559 // No unrouted events — sanity that Unrouted only fires on
1560 // actual routing misses.
1561 assert_eq!(manager.stats().events_unrouted, 0);
1562
1563 // Now exercise the actual #44 fix: when *no* Active shard
1564 // exists, `select_shard` returns `u16::MAX` (per #51), which
1565 // is unmappable. To set this up without mutating private
1566 // fields, we rely on the fact that the manager's `with_mapper`
1567 // returns `Arc<ShardMapper>` and `drain_specific` will refuse
1568 // to take active_count below min_shards. So we simulate the
1569 // race by directly using `ingest_raw` with a forged
1570 // RawEvent whose hash WILL be modulo'd to a non-existent id
1571 // — but in dynamic mode the mapper rules, not modulo. We
1572 // can't easily get there from here, so we instead validate
1573 // the mechanism via a separate static-mode test below.
1574 //
1575 // The above sanity-check that Active shards still route
1576 // correctly + the mapper-level test
1577 // `select_shard_does_not_fall_back_to_draining` together
1578 // cover the #44 + #51 contract. Adding a routing-table-
1579 // miss test here would require a `#[cfg(test)] fn` that
1580 // can mutate the routing table, which we deliberately
1581 // avoid (the manager's invariants must hold even from
1582 // tests).
1583 }
1584
1585 /// Regression: BUG_REPORT.md #47 — `remove_shard` previously
1586 /// just unmapped the shard from the routing table and let the
1587 /// drain worker observe `with_shard → None` and exit. Anything
1588 /// still queued in the ring buffer at that moment was silently
1589 /// stranded. The fix returns the drained events to the caller
1590 /// (typically `EventBus::remove_shard_internal`) so they can
1591 /// be flushed through to the adapter rather than dropped.
1592 #[test]
1593 fn remove_shard_returns_stranded_ring_buffer_events() {
1594 use crate::config::ScalingPolicy;
1595 let policy = ScalingPolicy {
1596 min_shards: 1,
1597 max_shards: 8,
1598 cooldown: std::time::Duration::from_nanos(1),
1599 ..Default::default()
1600 };
1601 let manager =
1602 ShardManager::with_mapper(2, 1024, BackpressureMode::DropNewest, policy).unwrap();
1603
1604 // Pin the routing for shard 1 by ingesting events with a
1605 // hash known to land there. We don't actually need
1606 // hash-routing precision: directly push into shard 1 via
1607 // `with_shard`, which bypasses select_shard.
1608 let pushed: Vec<&str> = vec![r#"{"a":1}"#, r#"{"a":2}"#, r#"{"a":3}"#];
1609 let pushed_count = pushed.len();
1610 for s in &pushed {
1611 manager
1612 .with_shard(1, |shard| {
1613 shard.try_push_raw(bytes::Bytes::from(s.as_bytes().to_vec()))
1614 })
1615 .expect("shard 1 exists")
1616 .expect("ring buffer has room");
1617 }
1618 assert_eq!(
1619 manager.with_shard(1, |s| s.len()).unwrap(),
1620 pushed_count,
1621 "events should be queued in shard 1"
1622 );
1623
1624 // Remove shard 1 — must return the stranded events, not
1625 // drop them silently.
1626 let stranded = manager
1627 .remove_shard(1)
1628 .expect("remove_shard must succeed in dynamic mode");
1629 assert_eq!(
1630 stranded.len(),
1631 pushed_count,
1632 "remove_shard must surface every event still in the \
1633 ring buffer (#47); got {} stranded events, expected {}",
1634 stranded.len(),
1635 pushed_count
1636 );
1637
1638 // Sanity: the events come back in FIFO order with the
1639 // bytes the producer pushed.
1640 for (i, ev) in stranded.iter().enumerate() {
1641 assert_eq!(ev.as_bytes(), pushed[i].as_bytes());
1642 assert_eq!(ev.shard_id, 1);
1643 }
1644
1645 // Sanity: shard 1 is gone from routing.
1646 assert!(manager.with_shard(1, |s| s.id).is_none());
1647 }
1648
1649 /// `ShardManager::activate_shard` is idempotent at
1650 /// the API level — two calls on the same shard return Ok(())
1651 /// each — but pre-fix `num_shards` was bumped on every call
1652 /// even when the mapper's `activate()` had already
1653 /// transitioned the shard to Active. After repeated calls,
1654 /// `num_shards` exceeded the actual count and `select_shard`'s
1655 /// modulo arithmetic mis-routed.
1656 #[test]
1657 fn activate_shard_is_idempotent_in_num_shards_count() {
1658 let policy = ScalingPolicy {
1659 min_shards: 1,
1660 max_shards: 16,
1661 cooldown: std::time::Duration::from_nanos(1),
1662 ..Default::default()
1663 };
1664 let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1665 .expect("dynamic scaling enabled");
1666 let initial = manager.num_shards();
1667 assert_eq!(initial, 2);
1668
1669 // Add + activate a new shard. count goes 2 → 3.
1670 let new_id = manager.add_shard().expect("add_shard");
1671 manager.activate_shard(new_id).expect("first activate");
1672 assert_eq!(
1673 manager.num_shards(),
1674 3,
1675 "first activate must bump num_shards to 3"
1676 );
1677
1678 // Repeat activate — must be a no-op on the count.
1679 manager
1680 .activate_shard(new_id)
1681 .expect("second activate (idempotent)");
1682 manager
1683 .activate_shard(new_id)
1684 .expect("third activate (idempotent)");
1685 assert_eq!(
1686 manager.num_shards(),
1687 3,
1688 "repeated activate_shard must NOT keep bumping num_shards; \
1689 pre-fix this would be 5 after three calls",
1690 );
1691 }
1692
1693 /// Removing a still-`Provisioning` shard (the activate-failure
1694 /// rollback path) must NOT decrement `num_shards`. `add_shard`
1695 /// only registers a `Provisioning` entry and intentionally
1696 /// leaves `num_shards` alone — the bump happens in
1697 /// `activate_shard`. A symmetric `fetch_sub` in `remove_shard`
1698 /// would therefore leave the counter one below the routing
1699 /// table's actual size after a rollback, breaking modulo-based
1700 /// shard selection. This pins the gating: the rollback removal
1701 /// is a num_shards no-op, while removing an activated shard
1702 /// still decrements normally.
1703 #[test]
1704 fn remove_provisioning_shard_does_not_decrement_num_shards() {
1705 let policy = ScalingPolicy {
1706 min_shards: 1,
1707 max_shards: 16,
1708 cooldown: std::time::Duration::from_nanos(1),
1709 ..Default::default()
1710 };
1711 let manager = ShardManager::with_mapper(2, 1024, BackpressureMode::DropOldest, policy)
1712 .expect("dynamic scaling enabled");
1713 let initial = manager.num_shards();
1714 assert_eq!(initial, 2);
1715
1716 // add_shard registers a Provisioning entry (no num_shards bump).
1717 let new_id = manager.add_shard().expect("add_shard");
1718 assert_eq!(
1719 manager.num_shards(),
1720 initial,
1721 "add_shard must NOT bump num_shards (Provisioning, not yet selectable)"
1722 );
1723
1724 // Simulate the activate-failure rollback path: remove the
1725 // never-activated shard. Pre-fix this fired
1726 // `fetch_sub(1)` unconditionally and dropped num_shards
1727 // below the table size.
1728 let stranded = manager.remove_shard(new_id).expect("rollback remove");
1729 assert!(
1730 stranded.is_empty(),
1731 "fresh provisioning shard has no events"
1732 );
1733 assert_eq!(
1734 manager.num_shards(),
1735 initial,
1736 "removing a provisioning (never-activated) shard must NOT decrement num_shards"
1737 );
1738
1739 // Companion: removing an activated shard still decrements,
1740 // so the gate is symmetric with activate_shard's fetch_add.
1741 let activated_id = manager.add_shard().expect("add for activated path");
1742 manager.activate_shard(activated_id).expect("activate");
1743 assert_eq!(
1744 manager.num_shards(),
1745 initial + 1,
1746 "activate bumps num_shards"
1747 );
1748 manager
1749 .remove_shard(activated_id)
1750 .expect("remove activated");
1751 assert_eq!(
1752 manager.num_shards(),
1753 initial,
1754 "removing an activated shard MUST decrement num_shards"
1755 );
1756 }
1757}