Skip to main content

net/
bus.rs

1//! Main EventBus facade.
2//!
3//! The EventBus provides a unified API for:
4//! - Event ingestion (non-blocking)
5//! - Event consumption (async polling with filtering)
6//! - Lifecycle management
7
8use crossbeam_utils::CachePadded;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15
16use crate::adapter::{Adapter, NoopAdapter};
17use crate::config::{AdapterConfig, BatchConfig, EventBusConfig};
18use crate::consumer::{ConsumeRequest, ConsumeResponse, PollMerger};
19use crate::error::{AdapterError, ConsumerError, IngestionError, IngestionResult};
20use crate::event::{Batch, Event, RawEvent};
21use crate::shard::{BatchWorker, ScalingDecision, ShardManager, ShardMetrics};
22
23#[cfg(feature = "jetstream")]
24use crate::adapter::JetStreamAdapter;
25#[cfg(feature = "net")]
26use crate::adapter::NetAdapter;
27#[cfg(feature = "redis")]
28use crate::adapter::RedisAdapter;
29
30/// The main event bus.
31///
32/// # Example
33///
34/// ```rust,ignore
35/// use net::{EventBus, EventBusConfig, Event};
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let bus = EventBus::new(EventBusConfig::default()).await?;
40///
41///     // Ingest events
42///     bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
43///
44///     // Poll events
45///     let response = bus.poll(ConsumeRequest::new(100)).await?;
46///
47///     bus.shutdown().await?;
48///     Ok(())
49/// }
50/// ```
51pub struct EventBus {
52    /// Shard manager for parallel ingestion.
53    shard_manager: Arc<ShardManager>,
54    /// Adapter for durable storage.
55    adapter: Arc<dyn Adapter>,
56    /// Poll merger for cross-shard consumption.
57    poll_merger: arc_swap::ArcSwap<PollMerger>,
58    /// Serializes the `shard_manager.shard_ids() → poll_merger.store`
59    /// block in `add_shard_internal` / `remove_shard_internal`.
60    /// Without this lock, two callers (e.g. scaling-monitor
61    /// add_shard racing manual_scale_down's remove_shard) read the
62    /// shard ids snapshot at slightly different points and then
63    /// race on the `arc_swap.store`. The write that lands second
64    /// can clobber the more-current view: T1 reads `{0..5}`, T2
65    /// reads `{1..4}`, T2 stores `{1..4}`, T1 stores `{0..5}` —
66    /// the published merger then routes polls to the just-removed
67    /// shard 0 until the next topology change repairs the
68    /// snapshot.
69    poll_merger_swap_lock: parking_lot::Mutex<()>,
70    /// Per-shard worker handles. Stored separately so shutdown can
71    /// await drain workers *before* batch workers — the drain
72    /// worker's final sweep races the batch worker's exit
73    /// otherwise, and any events the drain worker pushes to the
74    /// channel after the batch worker has stopped reading are
75    /// silently lost.
76    batch_workers: parking_lot::Mutex<std::collections::HashMap<u16, ShardWorkers>>,
77    /// Channels for sending batches to workers (shard_id -> sender).
78    batch_senders: parking_lot::RwLock<
79        std::collections::HashMap<u16, mpsc::Sender<Vec<crate::event::InternalEvent>>>,
80    >,
81    /// Shutdown flag.
82    shutdown: Arc<AtomicBool>,
83    /// Gate signaling drain workers that the in-flight wait has
84    /// completed and they may safely run their final ring-buffer
85    /// sweep. Distinct from `shutdown` because the drain worker
86    /// observing `shutdown=true` alone is not enough: a producer
87    /// that read `shutdown=false` may still be mid-push, and if the
88    /// drain worker rushes through its final sweep before that push
89    /// is visible the event is stranded. `shutdown()` sets this
90    /// after waiting for `in_flight_ingests==0`, at which point the
91    /// Acquire load on the drain side synchronizes-with the Release
92    /// store here, transitively chaining through the SeqCst
93    /// in-flight handshake to make every observed-pre-shutdown push
94    /// visible to the drain worker's subsequent `pop_batch_into`.
95    drain_finalize_ready: Arc<AtomicBool>,
96    /// In-flight ingest counter. Incremented before each ingest's
97    /// shutdown check and decremented after the push completes (or
98    /// bails). `shutdown()` waits for this to drop to zero *after*
99    /// setting `shutdown=true` and *before* setting
100    /// `drain_finalize_ready=true` so no producer is mid-push when
101    /// the drain workers do their final sweep — closing the race
102    /// where a producer that observed `shutdown=false` could push
103    /// *after* the drain worker's last `pop_batch_into` returned
104    /// zero, leaving the event stranded in the ring buffer.
105    /// Pre-fix this was `AtomicU32`. A 4-billion-in-flight
106    /// wrap is not realistic in production, but the counter
107    /// participates in the shutdown protocol — a wrap to 0 would
108    /// trick the wait-for-zero loop into thinking shutdown was
109    /// safe to proceed while producers were still mid-push.
110    /// Widened to `AtomicU64` so the wrap is astronomical
111    /// (1.8e19 in-flight ingests).
112    ///
113    /// **PERF_AUDIT §1.1** — Pre-fix this was a single shared
114    /// `AtomicU64`, and every `ingest`/`ingest_raw` did a SeqCst
115    /// `fetch_add` + a SeqCst `shutdown.load` + (on drop) a SeqCst
116    /// `fetch_sub` against it. On a multi-producer bus the cache
117    /// line ping-ponged across cores on every event, re-serializing
118    /// what was otherwise a fully sharded path. Now striped across
119    /// 32 cache-padded slots; producers pick one at thread-local
120    /// sticky affinity, and the cold reader (`shutdown` wait-for-
121    /// zero) sums slots.
122    in_flight_ingests: StripedInFlight,
123    /// Set to `true` after `shutdown()` runs to completion. `Drop`
124    /// uses this to detect "dropped without an awaited shutdown" —
125    /// in that case events still in the ring buffers / mpsc channels
126    /// are silently lost (see `Drop` impl).
127    shutdown_completed: AtomicBool,
128    /// Configuration.
129    config: EventBusConfig,
130    /// Statistics.
131    stats: Arc<EventBusStats>,
132    /// Producer nonce. Loaded from
133    /// `config.producer_nonce_path` on startup when the path is
134    /// configured; otherwise falls back to the per-process default
135    /// from `event::batch_process_nonce`. Stamped on every batch
136    /// the bus emits — the worker spawn copies this u64 into
137    /// `BatchWorkerParams::producer_nonce`, and
138    /// `remove_shard_internal`'s stranded-flush uses it via
139    /// `Batch::with_nonce`.
140    producer_nonce: u64,
141    /// Scaling monitor task handle.
142    scaling_monitor: parking_lot::Mutex<Option<JoinHandle<()>>>,
143}
144
145/// Worker handles for a single shard. The drain worker pumps
146/// events from the ring buffer into an mpsc channel; the batch
147/// worker reads from that channel and dispatches to the adapter.
148/// Shutdown ordering is load-bearing — see `EventBus::shutdown`.
149struct ShardWorkers {
150    batch: JoinHandle<()>,
151    drain: JoinHandle<()>,
152    /// Bus-owned mirror of the BatchWorker's `next_sequence`.
153    /// `remove_shard_internal` reads this AFTER awaiting `batch`
154    /// to learn the worker's final post-flush sequence, then uses
155    /// it as the `sequence_start` for the stranded-ring-buffer
156    /// flush so the stranded msg-ids fall strictly past every
157    /// msg-id the worker emitted — without this, JetStream dedup
158    /// would silently drop the stranded batch when both used 0.
159    next_sequence: Arc<AtomicU64>,
160}
161
162/// RAII guard for an in-flight ingest. Decrements
163/// `in_flight_ingests` on drop so `shutdown()` can wait for the
164/// counter to reach zero. Holds the slot index the matching
165/// `fetch_add` hit so the matching `fetch_sub` lands on the same
166/// slot — sub'ing from a different slot would still net-zero the
167/// sum but would skew slot occupancies under churn, eventually
168/// underflowing one slot to u64::MAX.
169struct IngestGuard<'a> {
170    bus: &'a EventBus,
171    slot: usize,
172}
173
174impl Drop for IngestGuard<'_> {
175    fn drop(&mut self) {
176        self.bus.in_flight_ingests.fetch_sub_at(self.slot, 1);
177    }
178}
179
180/// Number of cache-padded slots the in-flight counter stripes
181/// across. 32 is large enough to absorb typical core counts
182/// without collisions; small enough that the cold `sum()` path
183/// stays cheap. Power-of-two so slot selection is a mask.
184const IN_FLIGHT_STRIPE_SLOTS: usize = 32;
185
186/// Striped, cache-padded `in_flight_ingests` counter. Per
187/// PERF_AUDIT §1.1 — splits the single contended `AtomicU64` into
188/// 32 cache-padded slots so each producer hits its own line.
189/// Shutdown's wait-for-zero loop sums the slots.
190struct StripedInFlight {
191    slots: [CachePadded<AtomicU64>; IN_FLIGHT_STRIPE_SLOTS],
192}
193
194impl StripedInFlight {
195    fn new() -> Self {
196        Self {
197            slots: std::array::from_fn(|_| CachePadded::new(AtomicU64::new(0))),
198        }
199    }
200
201    /// Pick a slot for this thread. Sticky per-thread via a
202    /// `thread_local!` cell so repeat calls from the same
203    /// producer thread hit the same cache line; first call
204    /// derives an index from `thread::current().id()`.
205    #[inline(always)]
206    fn slot_for_current_thread(&self) -> usize {
207        thread_local! {
208            static SLOT: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
209        }
210        SLOT.with(|s| {
211            if let Some(idx) = s.get() {
212                return idx;
213            }
214            use std::collections::hash_map::DefaultHasher;
215            use std::hash::{Hash, Hasher};
216            let mut h = DefaultHasher::new();
217            std::thread::current().id().hash(&mut h);
218            let idx = (h.finish() as usize) & (IN_FLIGHT_STRIPE_SLOTS - 1);
219            s.set(Some(idx));
220            idx
221        })
222    }
223
224    #[inline(always)]
225    fn fetch_add_at(&self, slot: usize, n: u64) {
226        self.slots[slot].fetch_add(n, AtomicOrdering::SeqCst);
227    }
228
229    #[inline(always)]
230    fn fetch_sub_at(&self, slot: usize, n: u64) {
231        self.slots[slot].fetch_sub(n, AtomicOrdering::SeqCst);
232    }
233
234    /// Sum across all slots with SeqCst loads. Cold path —
235    /// shutdown's wait-for-zero + reconciliation only.
236    fn sum(&self) -> u64 {
237        self.slots
238            .iter()
239            .map(|s| s.load(AtomicOrdering::SeqCst))
240            .sum()
241    }
242}
243
244/// Event bus statistics.
245///
246/// **PERF_AUDIT §1.2** — Counters are `CachePadded` so the
247/// producer-hot `events_ingested`/`events_dropped` don't false-
248/// share with the batch-worker-hot
249/// `batches_dispatched`/`events_dispatched`. Pre-fix all four sat
250/// on 1-2 cache lines and every per-event `fetch_add` ping-ponged
251/// the line between ingesting producers and the batch worker.
252/// `CachePadded<AtomicU64>` derefs to `AtomicU64`, so external
253/// callers using `stats.events_ingested.load(...)` work unchanged.
254#[derive(Debug, Default)]
255pub struct EventBusStats {
256    /// Total events ingested.
257    pub events_ingested: CachePadded<AtomicU64>,
258    /// Events dropped due to backpressure.
259    pub events_dropped: CachePadded<AtomicU64>,
260    /// Batches dispatched to adapter.
261    ///
262    /// Pre-fix this field was declared but never incremented anywhere
263    /// — `flush()`'s Phase 2 progress probe (`bus.rs:815`) read it as
264    /// "did the BatchWorker make progress this `max_delay` window?",
265    /// observed `0 == 0`, and always early-broke after a single
266    /// window. On Windows-class timer resolution the race against the
267    /// BatchWorker's first `recv_timeout` tipped the wrong way for
268    /// `flush_is_a_delivery_barrier` regularly. Now incremented in
269    /// the BatchWorker spawn (after a successful `dispatch_batch`)
270    /// and in `remove_shard_internal`'s stranded-flush.
271    pub batches_dispatched: CachePadded<AtomicU64>,
272    /// Total events dispatched to the adapter (sum of batch lengths
273    /// from successful `on_batch` calls). Companion to
274    /// `batches_dispatched` — by the time `flush()` returns,
275    /// `events_dispatched + events_dropped == events_ingested`. FFI
276    /// consumers can also use this to monitor end-to-end delivery.
277    pub events_dispatched: CachePadded<AtomicU64>,
278    /// Set to `true` if `shutdown()` / `shutdown_via_ref()` had to
279    /// proceed past the in-flight-ingest grace deadline (5 s) with
280    /// producers still mid-push. The stranded count is counted into
281    /// `events_dropped` (already; pre-fix), but `shutdown` returns
282    /// `Ok(())` regardless — callers that need to distinguish
283    /// "clean shutdown" from "lossy shutdown" check this flag in
284    /// `bus.stats()` afterward (only meaningful for the
285    /// `shutdown_via_ref` path that doesn't consume the bus).
286    ///
287    /// Pre-fix the warning + `events_dropped` increment
288    /// were the only signal; `Result<(), AdapterError>` returned
289    /// `Ok` indistinguishable from a clean shutdown.
290    pub shutdown_was_lossy: std::sync::atomic::AtomicBool,
291}
292
293impl EventBus {
294    /// Create a new event bus with the given configuration.
295    pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError> {
296        // Create adapter from config
297        let adapter: Box<dyn Adapter> = match &config.adapter {
298            AdapterConfig::Noop => Box::new(NoopAdapter::new()),
299            #[cfg(feature = "redis")]
300            AdapterConfig::Redis(redis_config) => {
301                Box::new(RedisAdapter::new(redis_config.clone())?)
302            }
303            #[cfg(feature = "jetstream")]
304            AdapterConfig::JetStream(js_config) => {
305                Box::new(JetStreamAdapter::new(js_config.clone())?)
306            }
307            #[cfg(feature = "net")]
308            AdapterConfig::Net(net_config) => Box::new(NetAdapter::new((**net_config).clone())?),
309        };
310
311        Self::new_with_adapter(config, adapter).await
312    }
313
314    /// Create a new event bus with a caller-supplied adapter.
315    ///
316    /// `config.adapter` is ignored — the supplied `adapter` is used
317    /// instead. Useful for tests that need to observe or inject
318    /// behavior at the adapter boundary (e.g. a counting adapter
319    /// for end-to-end delivery assertions, a flaky adapter for
320    /// retry-path coverage).
321    pub async fn new_with_adapter(
322        config: EventBusConfig,
323        mut adapter: Box<dyn Adapter>,
324    ) -> Result<Self, AdapterError> {
325        config
326            .validate()
327            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
328
329        // Initialize adapter (with timeout to prevent hanging on unreachable backends)
330        tokio::time::timeout(config.adapter_timeout, adapter.init())
331            .await
332            .map_err(|_| AdapterError::Fatal("adapter init timed out".into()))??;
333        let adapter: Arc<dyn Adapter> = Arc::from(adapter);
334
335        // Create shard manager (with or without dynamic scaling)
336        let shard_manager = if let Some(ref scaling_policy) = config.scaling {
337            Arc::new(
338                ShardManager::with_mapper(
339                    config.num_shards,
340                    config.ring_buffer_capacity,
341                    config.backpressure_mode,
342                    scaling_policy.clone(),
343                )
344                .map_err(|e| AdapterError::Fatal(e.to_string()))?,
345            )
346        } else {
347            Arc::new(ShardManager::new(
348                config.num_shards,
349                config.ring_buffer_capacity,
350                config.backpressure_mode,
351            ))
352        };
353
354        // Create poll merger.
355        //
356        // Pass the live id set rather than the count. At
357        // initial construction the ids are dense (`0..num_shards`),
358        // but using `shard_ids()` here keeps a single code path with
359        // the post-scaling re-stores below.
360        let poll_merger = arc_swap::ArcSwap::from_pointee(PollMerger::new(
361            adapter.clone(),
362            shard_manager.shard_ids(),
363        ));
364
365        // Shutdown flag and drain-finalize gate. See `drain_finalize_ready`
366        // doc on `EventBus` for the synchronization contract.
367        let shutdown = Arc::new(AtomicBool::new(false));
368        let drain_finalize_ready = Arc::new(AtomicBool::new(false));
369
370        // Stats are shared with every BatchWorker spawn so successful
371        // dispatches increment `batches_dispatched` / `events_dispatched`
372        // — `flush()`'s Phase 2 progress probe gates on those.
373        let stats = Arc::new(EventBusStats::default());
374
375        // Producer nonce. Persistent path → load-or-create
376        // the durable u64 so cross-process retries dedup against the
377        // prior incarnation. No path → per-process default (today's
378        // at-most-once-across-restart behavior).
379        let producer_nonce = match &config.producer_nonce_path {
380            Some(path) => crate::adapter::PersistentProducerNonce::load_or_create(path)
381                .map_err(|e| {
382                    AdapterError::Fatal(format!(
383                        "failed to load/create producer-nonce file at {}: {e}",
384                        path.display(),
385                    ))
386                })?
387                .nonce(),
388            None => crate::event::batch_process_nonce(),
389        };
390
391        // Create batch workers for each shard
392        let mut batch_workers: std::collections::HashMap<u16, ShardWorkers> =
393            std::collections::HashMap::with_capacity(config.num_shards as usize);
394        let mut batch_senders =
395            std::collections::HashMap::with_capacity(config.num_shards as usize);
396
397        for shard_id in 0..config.num_shards {
398            let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
399
400            let next_sequence = Arc::new(AtomicU64::new(0));
401
402            let batch = spawn_batch_worker(BatchWorkerParams {
403                shard_id,
404                rx,
405                adapter: adapter.clone(),
406                shard_manager: shard_manager.clone(),
407                config: config.batch.clone(),
408                adapter_timeout: config.adapter_timeout,
409                batch_retries: config.adapter_batch_retries,
410                next_sequence: next_sequence.clone(),
411                stats: stats.clone(),
412                producer_nonce,
413            });
414
415            let drain = spawn_drain_worker_for_shard(
416                shard_id,
417                shard_manager.clone(),
418                tx.clone(),
419                shutdown.clone(),
420                drain_finalize_ready.clone(),
421            );
422
423            batch_workers.insert(
424                shard_id,
425                ShardWorkers {
426                    batch,
427                    drain,
428                    next_sequence,
429                },
430            );
431            batch_senders.insert(shard_id, tx);
432        }
433
434        let bus = Self {
435            shard_manager,
436            adapter,
437            poll_merger,
438            poll_merger_swap_lock: parking_lot::Mutex::new(()),
439            batch_workers: parking_lot::Mutex::new(batch_workers),
440            batch_senders: parking_lot::RwLock::new(batch_senders),
441            shutdown,
442            drain_finalize_ready,
443            in_flight_ingests: StripedInFlight::new(),
444            shutdown_completed: AtomicBool::new(false),
445            config,
446            stats,
447            producer_nonce,
448            scaling_monitor: parking_lot::Mutex::new(None),
449        };
450
451        Ok(bus)
452    }
453
454    /// Start the scaling monitor (if dynamic scaling is enabled).
455    /// This spawns a background task that periodically evaluates scaling decisions.
456    ///
457    /// The spawned task holds a `Weak<Self>` rather than a strong
458    /// `Arc<Self>` clone. With a strong clone the task kept the bus
459    /// alive forever, and `shutdown(self)` (which consumes by value)
460    /// was unreachable: callers with an `Arc<EventBus>` could not
461    /// `Arc::try_unwrap` to consume it because the spawned task always
462    /// held one of the strong refs.
463    ///
464    /// With a `Weak`, the monitor task upgrades each tick. Once the
465    /// last caller-held `Arc` is dropped, the upgrade fails and the
466    /// task exits cleanly. To shut down via `shutdown(self)`, the
467    /// caller must hold the only strong reference: `Arc::try_unwrap`
468    /// on the resulting bus succeeds because the spawned task only
469    /// holds a Weak.
470    pub fn start_scaling_monitor(self: &Arc<Self>) {
471        if self.config.scaling.is_none() {
472            return;
473        }
474
475        // Idempotency check: no-op when a monitor is already
476        // installed. Otherwise a second `start_scaling_monitor`
477        // call would overwrite the slot without aborting the
478        // previous `JoinHandle` — the displaced task would keep
479        // running, holding a `Weak<EventBus>`, only exiting when it
480        // next observed `shutdown` or failed to upgrade. Two
481        // concurrent monitors would briefly compete for
482        // `evaluate_scaling`'s lock, doubling the metrics-tick
483        // wakeup rate.
484        let mut slot = self.scaling_monitor.lock();
485        if slot.is_some() {
486            tracing::debug!("start_scaling_monitor: monitor already running, skipping");
487            return;
488        }
489
490        let weak = Arc::downgrade(self);
491        let handle = tokio::spawn(async move {
492            run_scaling_monitor_via_weak(weak).await;
493        });
494
495        *slot = Some(handle);
496    }
497
498    /// Internal: Add a new shard with its workers.
499    ///
500    /// The previous implementation called `shard_manager.add_shard()`
501    /// first, which atomically marked the shard `Active` and published
502    /// it to the routing table. So `select_shard` could route producer
503    /// pushes to the new id *before* any drain or batch worker existed,
504    /// leaving events queued in a buffer with no consumer (and
505    /// triggering the configured backpressure mode if the buffer
506    /// filled).
507    ///
508    /// The fix uses the two-phase API on `ShardManager`:
509    ///   1. `add_shard()` allocates the id and metrics collector,
510    ///      adds the shard to the routing table in `Provisioning`
511    ///      state — so `with_shard` works (which the drain worker
512    ///      needs) but `select_shard` skips it.
513    ///   2. Spawn batch + drain workers and register the sender.
514    ///   3. `activate_shard()` flips state to `Active`. Only now
515    ///      does `select_shard` start routing producer pushes.
516    async fn add_shard_internal(&self) -> Result<u16, AdapterError> {
517        self.add_shard_internal_with_cooldown_policy(false).await
518    }
519
520    /// Like [`add_shard_internal`] but bypasses the auto-scaling
521    /// cooldown. See [`ShardManager::add_shard_force`].
522    async fn add_shard_internal_force(&self) -> Result<u16, AdapterError> {
523        self.add_shard_internal_with_cooldown_policy(true).await
524    }
525
526    async fn add_shard_internal_with_cooldown_policy(
527        &self,
528        force: bool,
529    ) -> Result<u16, AdapterError> {
530        // Step 1: provisioning add — not yet selectable.
531        let new_id = if force {
532            self.shard_manager.add_shard_force()
533        } else {
534            self.shard_manager.add_shard()
535        }
536        .map_err(|e| AdapterError::Fatal(e.to_string()))?;
537
538        // Step 2: spawn workers and register the sender.
539        let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
540
541        let next_sequence = Arc::new(AtomicU64::new(0));
542
543        let batch = spawn_batch_worker(BatchWorkerParams {
544            shard_id: new_id,
545            rx,
546            adapter: self.adapter.clone(),
547            shard_manager: self.shard_manager.clone(),
548            config: self.config.batch.clone(),
549            adapter_timeout: self.config.adapter_timeout,
550            batch_retries: self.config.adapter_batch_retries,
551            next_sequence: next_sequence.clone(),
552            stats: self.stats.clone(),
553            producer_nonce: self.producer_nonce,
554        });
555
556        // Order of publish here matters. Today `select_shard`
557        // filters on `state == Active` and a shard is still
558        // `Provisioning` at this point, so producers cannot route
559        // to it yet — but the pattern (insert sender → spawn drain
560        // worker → activate_shard) is ordering-fragile: a future
561        // refactor that flips `activate_shard` ahead of the drain-
562        // worker spawn would leave a window where producers can
563        // push but no drain worker is polling. Pin the order by
564        // installing the sender BEFORE the drain worker (existing
565        // behaviour) so the worker observes a fully-published
566        // sender at spawn time; `activate_shard` runs further down.
567        self.batch_senders.write().insert(new_id, tx.clone());
568
569        let drain = spawn_drain_worker_for_shard(
570            new_id,
571            self.shard_manager.clone(),
572            tx,
573            self.shutdown.clone(),
574            self.drain_finalize_ready.clone(),
575        );
576
577        self.batch_workers.lock().insert(
578            new_id,
579            ShardWorkers {
580                batch,
581                drain,
582                next_sequence,
583            },
584        );
585
586        // Step 3: workers are live — flip the shard to Active so
587        // `select_shard` will route to it.
588        //
589        // On `activate_shard` failure we mirror
590        // `remove_shard_internal`'s teardown: drop the sender,
591        // unmap the provisioning entry (which atomically pops any
592        // residual ring-buffer events), gracefully await both
593        // workers (so the drain worker's `scratch` Vec sends its
594        // contents on the channel and the batch worker's
595        // `current_batch` is flushed via the channel-close path),
596        // then dispatch any stranded ring-buffer events through
597        // the adapter. Pre-fix this used `.abort()` on both
598        // handles which dropped the drain
599        // worker's scratch and the batch worker's current_batch
600        // without dispatch.
601        if let Err(e) = self.shard_manager.activate_shard(new_id) {
602            tracing::warn!(
603                shard_id = new_id,
604                error = %e,
605                "activate_shard failed; rolling back provisioning state"
606            );
607
608            // 1. Drop the bus-side sender. The drain worker still
609            //    holds its own clone, so the channel stays open
610            //    until `with_shard` returns None (step 2) and the
611            //    drain worker breaks out of its loop, dropping
612            //    its sender and finally closing the channel.
613            self.batch_senders.write().remove(&new_id);
614
615            // 2. Atomically pop any ring-buffer residue and
616            //    unmap the Provisioning entry. After this,
617            //    `with_shard(new_id)` returns None and the drain
618            //    worker exits at its next poll (after sending
619            //    any events it had already popped into `scratch`
620            //    on this iteration).
621            //
622            //    For a brand-new Provisioning shard the buffer
623            //    should be empty (`select_shard` skips
624            //    non-Active states), so `stranded` is normally
625            //    `Vec::new()`. The flush below is a defensive
626            //    no-op on the happy path but covers any future
627            //    code path that routes to a Provisioning shard
628            //    or any race window that tucked an event in
629            //    before `activate_shard` returned its error.
630            let stranded = self.shard_manager.remove_shard(new_id).unwrap_or_default();
631
632            // 3. Take ownership of the worker handles and await
633            //    them gracefully. Order: drain first (it pumps
634            //    its scratch + final-sweep contents into the
635            //    channel and exits), then batch (which receives
636            //    those events plus any prior channel residue,
637            //    flushes its own `current_batch`, and exits).
638            //
639            //    Awaiting in this order is what makes the drain
640            //    worker's scratch reach the adapter — the
641            //    drain's `Some(N>0)` arm `mem::replace`s scratch
642            //    into a batch and `sender.send(batch).await`s
643            //    it; that send must complete (or fail) before
644            //    the drain worker breaks. The batch worker's
645            //    `Ok(None)` arm then runs after both senders
646            //    are dropped and flushes any pending batch.
647            // Bound each JoinHandle await so a worker that's
648            // parked inside a slow adapter call (e.g. drain worker
649            // mid-`sender.send().await` against a backpressured
650            // channel because the batch worker is itself blocked
651            // in the adapter) cannot pin rollback indefinitely.
652            // 2x `adapter_timeout` is the natural ceiling: the
653            // batch worker uses `adapter_timeout` per dispatch and
654            // is expected to wake within that window. A timeout
655            // here leaks the JoinHandle — acceptable because
656            // step 1 already removed the bus-side sender, so the
657            // detached task can't observe new work and will exit
658            // on its next loop iteration.
659            let workers = self.batch_workers.lock().remove(&new_id);
660            // `worker_detached` is set when either join times out
661            // — the worker is still running on a leaked
662            // JoinHandle. In that case the `next_sequence` atomic
663            // is no longer a reliable upper bound: the detached
664            // worker may publish a final batch whose msg-ids land
665            // in the same `[next_sequence..N]` range we'd use for
666            // the stranded-flush, producing duplicate XADDs or
667            // JetStream dedup hits. Skip the stranded-flush when
668            // detached and surface the loss explicitly so the
669            // operator sees it.
670            let mut worker_detached = false;
671            let final_next_sequence = if let Some(workers) = workers {
672                let bound = self.config.adapter_timeout.saturating_mul(2);
673                match tokio::time::timeout(bound, workers.drain).await {
674                    Ok(Ok(())) => {}
675                    Ok(Err(err)) => {
676                        tracing::warn!(
677                            shard_id = new_id,
678                            error = %err,
679                            "drain worker JoinHandle errored on activate-failure rollback"
680                        );
681                    }
682                    Err(_) => {
683                        tracing::warn!(
684                            shard_id = new_id,
685                            timeout_ms = bound.as_millis() as u64,
686                            "drain worker did not exit within timeout on activate-failure rollback; detaching"
687                        );
688                        worker_detached = true;
689                    }
690                }
691                match tokio::time::timeout(bound, workers.batch).await {
692                    Ok(Ok(())) => {}
693                    Ok(Err(err)) => {
694                        tracing::warn!(
695                            shard_id = new_id,
696                            error = %err,
697                            "BatchWorker JoinHandle errored on activate-failure rollback"
698                        );
699                    }
700                    Err(_) => {
701                        tracing::warn!(
702                            shard_id = new_id,
703                            timeout_ms = bound.as_millis() as u64,
704                            "BatchWorker did not exit within timeout on activate-failure rollback; detaching"
705                        );
706                        worker_detached = true;
707                    }
708                }
709                workers.next_sequence.load(AtomicOrdering::Acquire)
710            } else {
711                0
712            };
713
714            // 4. Dispatch any stranded events as a single-shot
715            //    batch so they reach durable storage with the
716            //    correct sequence-id segment. Identical to the
717            //    `remove_shard_internal` teardown — but only when
718            //    the worker actually exited. If it timed out and
719            //    is still running on a leaked handle, dispatching
720            //    here would emit msg-ids overlapping the worker's
721            //    final flush; surface the events as dropped
722            //    instead so the duplicate-on-the-wire hazard is
723            //    avoided.
724            if !stranded.is_empty() && worker_detached {
725                let count = stranded.len();
726                self.stats
727                    .events_dropped
728                    .fetch_add(count as u64, AtomicOrdering::Relaxed);
729                self.stats
730                    .shutdown_was_lossy
731                    .store(true, AtomicOrdering::Release);
732                tracing::error!(
733                    shard_id = new_id,
734                    count,
735                    "activate-failure rollback: skipping stranded-flush \
736                     because a worker JoinHandle timed out and may still \
737                     be running; events would collide with the detached \
738                     worker's final batch on the wire. Counted as dropped."
739                );
740            } else if !stranded.is_empty() {
741                let count = stranded.len();
742                let batch = crate::event::Batch::with_nonce(
743                    new_id,
744                    stranded,
745                    final_next_sequence,
746                    self.producer_nonce,
747                );
748                let dispatched = dispatch_batch(
749                    &*self.adapter,
750                    Arc::new(batch),
751                    new_id,
752                    self.config.adapter_timeout,
753                    self.config.adapter_batch_retries,
754                )
755                .await;
756                if dispatched {
757                    self.stats
758                        .batches_dispatched
759                        .fetch_add(1, AtomicOrdering::Relaxed);
760                    self.stats
761                        .events_dispatched
762                        .fetch_add(count as u64, AtomicOrdering::Relaxed);
763                    tracing::info!(
764                        shard_id = new_id,
765                        count,
766                        "activate-failure rollback: flushed stranded events to adapter",
767                    );
768                } else {
769                    tracing::error!(
770                        shard_id = new_id,
771                        count,
772                        "activate-failure rollback: adapter rejected stranded events; \
773                         events lost"
774                    );
775                }
776            }
777
778            return Err(AdapterError::Fatal(e.to_string()));
779        }
780
781        // Update poll merger with the post-add id set. Hold
782        // `poll_merger_swap_lock` across the snapshot-and-store so a
783        // concurrent remove_shard can't sneak between our `shard_ids()`
784        // read and our `arc_swap.store` and clobber the published view
785        // with a stale snapshot.
786        {
787            let _swap_guard = self.poll_merger_swap_lock.lock();
788            self.poll_merger.store(Arc::new(PollMerger::new(
789                self.adapter.clone(),
790                self.shard_manager.shard_ids(),
791            )));
792        }
793
794        tracing::info!(shard_id = new_id, "Added new shard");
795        Ok(new_id)
796    }
797
798    /// Internal: Remove a stopped shard.
799    ///
800    /// Previously this dropped the worker `JoinHandle`s and unmapped
801    /// the shard without first draining its ring buffer. Any events
802    /// still queued at the moment of removal — even just a few from a
803    /// producer that pushed concurrently with the scale-down decision
804    /// — were silently stranded once the drain worker exited on
805    /// `with_shard → None`.
806    ///
807    /// The fix:
808    ///   1. Wait for the drain worker we're about to retire to flush
809    ///      the channel, by closing the bus-side sender first.
810    ///   2. Call `remove_shard`, which atomically pops the
811    ///      ring-buffer remnants and unmaps the shard. The drained
812    ///      events come back to us as a `Vec`.
813    ///   3. Hand those events directly to the adapter as a
814    ///      single-shot batch — bypassing the per-shard pipeline
815    ///      that's already being torn down — so they reach durable
816    ///      storage.
817    async fn remove_shard_internal(&self, shard_id: u16) -> Result<(), AdapterError> {
818        // Step 1: drop the bus-side sender. The drain worker still
819        // has its own clone and will keep draining; we want it to
820        // exit when its `with_shard` call returns `None` after
821        // step 2's unmap.
822        self.batch_senders.write().remove(&shard_id);
823
824        // Step 2: atomically drain whatever's in the ring buffer and
825        // unmap. After this, `with_shard(shard_id)` returns `None`
826        // and the drain worker exits at its next poll.
827        let stranded = self
828            .shard_manager
829            .remove_shard(shard_id)
830            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
831
832        // Step 3: take ownership of the worker handles (move them
833        // OUT of the mutex map so we can `await` them — `await`
834        // consumes a `JoinHandle`). With the bus-side sender already
835        // dropped (step 1) and the shard unmapped (step 2), the
836        // drain worker exits at its next poll and drops its sender
837        // clone, which closes the BatchWorker's `rx`. The
838        // BatchWorker then flushes any pending `current_batch` and
839        // any events still buffered in the channel, dispatches them
840        // via the standard `dispatch_batch` path with their PROPER
841        // `next_sequence` values, and exits.
842        //
843        // Await order: drain first, then batch. The drain worker's
844        // `Some(N>0)` arm `mem::replace`s scratch into a batch and
845        // `sender.send(batch).await`s it; that send must complete
846        // (or fail) before drain breaks. The batch worker's
847        // `Ok(None)` arm runs after the sender drops and flushes
848        // any pending batch. Awaiting in the reverse order would
849        // park here forever: the batch worker's `recv()` only
850        // returns `None` once every sender clone (including the
851        // drain worker's) has dropped.
852        //
853        // Both awaits are bounded by `2 × adapter_timeout` so a
854        // worker parked inside a slow adapter call cannot pin
855        // teardown indefinitely. A timeout leaks the JoinHandle —
856        // acceptable because step 1 already removed the bus-side
857        // sender and step 2 unmapped the shard, so the detached
858        // task can't observe new work and will exit on its next
859        // loop iteration.
860        let workers = self.batch_workers.lock().remove(&shard_id);
861        let final_next_sequence = if let Some(workers) = workers {
862            let bound = self.config.adapter_timeout.saturating_mul(2);
863            match tokio::time::timeout(bound, workers.drain).await {
864                Ok(Ok(())) => {}
865                Ok(Err(e)) => {
866                    tracing::warn!(
867                        shard_id,
868                        error = %e,
869                        "drain worker JoinHandle errored on await; \
870                         drain worker should have already exited via \
871                         `with_shard -> None`",
872                    );
873                }
874                Err(_) => {
875                    tracing::warn!(
876                        shard_id,
877                        timeout_ms = bound.as_millis() as u64,
878                        "drain worker did not exit within timeout on remove_shard; detaching",
879                    );
880                }
881            }
882            match tokio::time::timeout(bound, workers.batch).await {
883                Ok(Ok(())) => {}
884                Ok(Err(e)) => {
885                    tracing::warn!(
886                        shard_id,
887                        error = %e,
888                        "BatchWorker JoinHandle errored on await; \
889                         proceeding with stranded-flush using last \
890                         published next_sequence",
891                    );
892                }
893                Err(_) => {
894                    tracing::warn!(
895                        shard_id,
896                        timeout_ms = bound.as_millis() as u64,
897                        "BatchWorker did not exit within timeout on remove_shard; detaching",
898                    );
899                }
900            }
901            workers.next_sequence.load(AtomicOrdering::Acquire)
902        } else {
903            // No worker registered for this shard — shouldn't
904            // happen on the normal scale-down path, but defensively
905            // fall back to 0. This branch only activates if a
906            // caller manages to call `remove_shard_internal` for a
907            // shard that was never spawned (or already removed).
908            0
909        };
910
911        // Step 4: flush the stranded ring-buffer events through the
912        // adapter in one shot, using `final_next_sequence` (NOT 0)
913        // as the `sequence_start`. The stranded batch's msg-ids
914        // are `{nonce}:{shard_id}:{final_next_sequence}:{i}` —
915        // strictly past every msg-id the worker emitted. Using 0
916        // would collide with the worker's very first batch
917        // (`{nonce}:{shard_id}:0:{i}`), and JetStream's 2 min dedup
918        // window would silently drop the duplicate.
919        if !stranded.is_empty() {
920            let count = stranded.len();
921            // Use the bus's loaded producer nonce so the stranded
922            // batch's msg-ids share the same producer-identity
923            // segment as everything else this bus has emitted —
924            // critical for cross-process dedup.
925            let batch = crate::event::Batch::with_nonce(
926                shard_id,
927                stranded,
928                final_next_sequence,
929                self.producer_nonce,
930            );
931            let dispatched = dispatch_batch(
932                &*self.adapter,
933                Arc::new(batch),
934                shard_id,
935                self.config.adapter_timeout,
936                self.config.adapter_batch_retries,
937            )
938            .await;
939            if dispatched {
940                self.stats
941                    .batches_dispatched
942                    .fetch_add(1, AtomicOrdering::Relaxed);
943                self.stats
944                    .events_dispatched
945                    .fetch_add(count as u64, AtomicOrdering::Relaxed);
946                tracing::info!(
947                    shard_id,
948                    count,
949                    sequence_start = final_next_sequence,
950                    "Removed shard: flushed stranded ring-buffer events to adapter"
951                );
952            } else {
953                tracing::error!(
954                    shard_id,
955                    count,
956                    sequence_start = final_next_sequence,
957                    "Removed shard: adapter rejected stranded ring-buffer events; \
958                     events lost"
959                );
960            }
961        }
962
963        // Update poll merger with the post-remove id set.
964        // Without this, a default-shards poll (`request.shards == None`)
965        // would still iterate `0..num_shards` and skip the live shard
966        // whose id is now the largest, while polling a nonexistent /
967        // recreated shard at the bottom of the range.
968        //
969        // `poll_merger_swap_lock` serializes against
970        // `add_shard_internal`'s matching block — see the field doc.
971        {
972            let _swap_guard = self.poll_merger_swap_lock.lock();
973            self.poll_merger.store(Arc::new(PollMerger::new(
974                self.adapter.clone(),
975                self.shard_manager.shard_ids(),
976            )));
977        }
978
979        tracing::info!(shard_id = shard_id, "Removed shard");
980        Ok(())
981    }
982
983    /// Try to enter an ingest critical section. Returns `None` if
984    /// shutdown is in progress, in which case the caller must
985    /// return `IngestionError::ShuttingDown` without touching the
986    /// shard manager.
987    ///
988    /// The `fetch_add` + load(`shutdown`) sequence pairs with
989    /// `shutdown()`'s store(`shutdown=true`) + wait-for-zero on
990    /// `in_flight_ingests`. SeqCst on both sides closes the
991    /// stranding race: every ingest that is observed as in-flight
992    /// during shutdown's wait is guaranteed to complete before the
993    /// drain workers do their final ring-buffer sweep, so no event
994    /// can land in a ring buffer after the drain worker has stopped
995    /// reading from it.
996    #[inline(always)]
997    fn try_enter_ingest(&self) -> Option<IngestGuard<'_>> {
998        let slot = self.in_flight_ingests.slot_for_current_thread();
999        self.in_flight_ingests.fetch_add_at(slot, 1);
1000        if self.shutdown.load(AtomicOrdering::SeqCst) {
1001            self.in_flight_ingests.fetch_sub_at(slot, 1);
1002            return None;
1003        }
1004        Some(IngestGuard { bus: self, slot })
1005    }
1006
1007    /// Ingest an event.
1008    ///
1009    /// This is a non-blocking operation. The event is added to the appropriate
1010    /// shard's ring buffer and will be batched and persisted asynchronously.
1011    ///
1012    /// # Returns
1013    ///
1014    /// The shard ID and insertion timestamp on success.
1015    #[inline]
1016    pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)> {
1017        let _g = self
1018            .try_enter_ingest()
1019            .ok_or(IngestionError::ShuttingDown)?;
1020
1021        match self.shard_manager.ingest(event.into_inner()) {
1022            Ok((shard_id, ts)) => {
1023                self.stats
1024                    .events_ingested
1025                    .fetch_add(1, AtomicOrdering::Relaxed);
1026                Ok((shard_id, ts))
1027            }
1028            Err(e) => {
1029                self.stats
1030                    .events_dropped
1031                    .fetch_add(1, AtomicOrdering::Relaxed);
1032                Err(e)
1033            }
1034        }
1035    }
1036
1037    /// Ingest a raw event (pre-serialized with cached hash).
1038    ///
1039    /// This is the fastest ingestion path:
1040    /// - Uses pre-computed hash for shard selection (no serialization)
1041    /// - Stores bytes directly (no clone needed, reference-counted)
1042    ///
1043    /// # Returns
1044    ///
1045    /// The shard ID and insertion timestamp on success.
1046    #[inline]
1047    pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)> {
1048        let _g = self
1049            .try_enter_ingest()
1050            .ok_or(IngestionError::ShuttingDown)?;
1051
1052        match self.shard_manager.ingest_raw(event) {
1053            Ok((shard_id, ts)) => {
1054                self.stats
1055                    .events_ingested
1056                    .fetch_add(1, AtomicOrdering::Relaxed);
1057                Ok((shard_id, ts))
1058            }
1059            Err(e) => {
1060                self.stats
1061                    .events_dropped
1062                    .fetch_add(1, AtomicOrdering::Relaxed);
1063                Err(e)
1064            }
1065        }
1066    }
1067
1068    /// Ingest a batch of events.
1069    ///
1070    /// This is more efficient than calling `ingest` repeatedly: events
1071    /// destined for the same shard share a single mutex acquisition.
1072    ///
1073    /// # Returns
1074    ///
1075    /// The number of successfully ingested events.
1076    pub fn ingest_batch(&self, events: Vec<Event>) -> usize {
1077        // The shutdown gate lives in `ingest_raw_batch`, which we
1078        // forward to. No separate guard here — that would double-
1079        // count `in_flight_ingests` (once for this call, once for
1080        // the inner call) and could deadlock shutdown under high
1081        // contention.
1082        let raw: Vec<RawEvent> = events.into_iter().map(|e| e.into_raw()).collect();
1083        self.ingest_raw_batch(raw)
1084    }
1085
1086    /// Ingest a batch of raw events (fastest batch ingestion).
1087    ///
1088    /// Groups events by their destination shard and pushes each group
1089    /// under a single mutex acquisition.
1090    ///
1091    /// # Returns
1092    ///
1093    /// The number of successfully ingested events.
1094    pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize {
1095        let _g = match self.try_enter_ingest() {
1096            Some(g) => g,
1097            None => return 0,
1098        };
1099
1100        let total = events.len();
1101        let (success, unrouted) = self.shard_manager.ingest_raw_batch(events);
1102        if success > 0 {
1103            self.stats
1104                .events_ingested
1105                .fetch_add(success as u64, AtomicOrdering::Relaxed);
1106        }
1107        // Subtract `unrouted` from the buffer-fullness drop count
1108        // so the same event isn't tallied in both `events_unrouted`
1109        // (bumped inside `ShardManager::ingest_raw_batch`) and
1110        // `events_dropped` — using a plain `total - success` here
1111        // would double-count unrouted events. Backpressure drops
1112        // = events that reached a shard but failed to push.
1113        let dropped = total.saturating_sub(success).saturating_sub(unrouted);
1114        if dropped > 0 {
1115            self.stats
1116                .events_dropped
1117                .fetch_add(dropped as u64, AtomicOrdering::Relaxed);
1118        }
1119        success
1120    }
1121
1122    /// Poll events from the bus.
1123    ///
1124    /// This retrieves events from storage according to the request parameters.
1125    ///
1126    /// # Topology-change visibility
1127    ///
1128    /// `ArcSwap::load()` snapshots the current `PollMerger` for
1129    /// the duration of this call. A concurrent `add_shard` /
1130    /// `remove_shard_internal` that `.store()`s a fresh merger
1131    /// only affects **subsequent** polls — this poll continues
1132    /// against the loaded snapshot's shard list.
1133    ///
1134    /// The implications:
1135    ///   - **add_shard mid-poll:** events ingested into the new
1136    ///     shard between the merger swap and our return are
1137    ///     invisible to this call. They appear on the next poll.
1138    ///   - **remove_shard_internal mid-poll:** the stale merger
1139    ///     still has the removed shard in its id list. Adapters
1140    ///     that lazy-create streams on `poll_shard` (JetStream
1141    ///     in particular) may recreate the stream and return
1142    ///     empty/stale data. The drained events are dispatched
1143    ///     to durable storage by `remove_shard_internal` itself
1144    ///     before this poll's adapter call lands; the next poll
1145    ///     loads the new merger and sees the correct shard set.
1146    ///
1147    /// In both cases the loss is transient and self-healing:
1148    /// pagination via `next_id` and the next poll's cursor pick
1149    /// up where we left off. Callers requiring strict
1150    /// "topology-stable" semantics should serialize their polls
1151    /// against scaling operations externally.
1152    pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
1153        let merger = self.poll_merger.load();
1154        merger.poll(request).await
1155    }
1156
1157    /// Get the number of shards.
1158    pub fn num_shards(&self) -> u16 {
1159        self.shard_manager.num_shards()
1160    }
1161
1162    /// Get the adapter name.
1163    pub fn adapter_name(&self) -> &'static str {
1164        self.adapter.name()
1165    }
1166
1167    /// Check if the adapter is healthy.
1168    pub async fn is_healthy(&self) -> bool {
1169        self.adapter.is_healthy().await
1170    }
1171
1172    /// Get statistics.
1173    pub fn stats(&self) -> &EventBusStats {
1174        &self.stats
1175    }
1176
1177    /// Get shard statistics.
1178    pub fn shard_stats(&self) -> crate::shard::ShardStats {
1179        self.shard_manager.stats()
1180    }
1181
1182    /// Sum of `len()` across every shard's ring buffer.
1183    ///
1184    /// Mainly useful in tests and operational diagnostics: a
1185    /// non-zero value at the time of `Drop` (without an awaited
1186    /// `shutdown()`) would be silently lost, so `Drop` folds this
1187    /// into `events_dropped` before the bus disappears.
1188    pub fn pending_in_rings(&self) -> u64 {
1189        self.shard_manager.total_pending_in_rings()
1190    }
1191
1192    /// Flush all pending batches.
1193    ///
1194    /// Waits for all shard ring buffers to drain, then for the
1195    /// per-shard mpsc channels to drain, then for any pending batch
1196    /// inside each batch worker to time out and dispatch — and only
1197    /// then calls `adapter.flush()`.
1198    ///
1199    /// # Latency bound
1200    ///
1201    /// The total wall-clock budget is the sum of three phases:
1202    ///   * Phase 1 (ring-buffer drain): up to **5 s**.
1203    ///   * Phase 2 (channel + pending-batch drain): up to
1204    ///     `min(2 s, batch.max_delay × n_workers)` — capped at 2 s
1205    ///     so a misconfigured `max_delay` cannot inflate the budget.
1206    ///   * Phase 3 (`adapter.flush()` call): up to `adapter_timeout`
1207    ///     (default **30 s**).
1208    ///
1209    /// Worst-case `flush()` runtime is therefore **~37 s under
1210    /// default config**, NOT 5 s as an earlier doc-comment stated.
1211    /// Callers wiring `flush()` into request-path latencies (HTTP
1212    /// handler, RPC) MUST set `adapter_timeout` accordingly or run
1213    /// the flush under their own outer timeout. The 5-second figure
1214    /// describes Phase 1 only; the doc was misleading and is fixed
1215    /// here.
1216    ///
1217    /// The previous implementation slept a single `batch.max_delay`
1218    /// (default 10 ms) after the ring buffers drained and immediately
1219    /// called `adapter.flush()`. Events still in transit through the
1220    /// per-shard mpsc channel, the batch worker's pending batch, or
1221    /// the in-progress `adapter.on_batch` call (bounded only by
1222    /// `adapter_timeout`, default 30 s) could miss the flush. Callers
1223    /// using `flush()` as a delivery barrier silently lost events.
1224    pub async fn flush(&self) -> Result<(), AdapterError> {
1225        let start = tokio::time::Instant::now();
1226        let timeout = Duration::from_secs(5);
1227        let mut backoff = Duration::from_micros(100);
1228
1229        // Phase 1: wait for ring buffers to drain (drain workers
1230        // pump them into the per-shard mpsc channels).
1231        loop {
1232            if self.shard_manager.all_shards_empty() {
1233                break;
1234            }
1235            if start.elapsed() >= timeout {
1236                tracing::warn!("flush: ring buffers not fully drained after {:?}", timeout);
1237                break;
1238            }
1239            tokio::time::sleep(backoff).await;
1240            backoff = (backoff * 2).min(Duration::from_millis(10));
1241        }
1242
1243        // Phase 2: wait until every event ingested before flush()
1244        // started has been handed to the adapter via `on_batch`.
1245        // Snapshot the `events_ingested` counter at flush entry —
1246        // that's our target. We then poll `events_dispatched` (sum
1247        // of batch lengths from successful adapter dispatches) plus
1248        // `events_dropped` (events the adapter rejected after retry
1249        // exhaustion or that never made it past backpressure). The
1250        // barrier is met when `events_dispatched + events_dropped >=
1251        // target`: every pre-flush ingest is accounted for in one
1252        // bucket or the other.
1253        //
1254        // This is a true delivery barrier — it doesn't rely on
1255        // "no progress this window" heuristics that race a
1256        // BatchWorker whose `batch_start` was set just before
1257        // flush() ran. A progress gate that reads
1258        // `batches_dispatched` only works if that counter is
1259        // actually incremented on every dispatch, and Windows
1260        // timer resolution alone has historically made any
1261        // single-`max_delay`-sleep approach a frequent flake.
1262        let target_ingested = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1263        let dropped_at_start = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1264        let dispatched_at_start = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1265
1266        // Outer deadline still bounds Phase 2 in case a wedged
1267        // adapter never returns. `max_delay * num_workers` is the
1268        // worst-case shape (one partially-filled batch per worker,
1269        // each waiting its full `max_delay` to time out), capped at
1270        // 2 s — same upper bound as before.
1271        //
1272        // Read the worker count via the shard manager's atomic-
1273        // backed `num_shards()` rather than `batch_workers.lock()
1274        // .len()`. The previous spinlock-backed `.lock()` inside
1275        // an `async fn` could stall the runtime worker thread
1276        // under contention with concurrent `add_shard_internal` /
1277        // `remove_shard_internal` callers; the atomic accessor is
1278        // both faster and async-safe. Mismatch in the
1279        // worker-count vs shard-count snapshot only changes the
1280        // phase2 deadline by at most one `max_delay` step, which
1281        // is bounded by the outer 2s cap regardless.
1282        let n_workers = usize::from(self.shard_manager.num_shards());
1283        let phase2_budget = self
1284            .config
1285            .batch
1286            .max_delay
1287            .saturating_mul(n_workers.max(1) as u32);
1288        let phase2_deadline =
1289            tokio::time::Instant::now() + phase2_budget.min(Duration::from_secs(2));
1290
1291        // Inner poll cadence: re-check the counters every 1 ms (or
1292        // `max_delay / 16`, whichever is larger). The fast cadence
1293        // means we exit promptly once the BatchWorker dispatches,
1294        // rather than waking exactly once per `max_delay` and
1295        // potentially racing the dispatch by a few ms.
1296        let poll_interval = (self.config.batch.max_delay / 16).max(Duration::from_millis(1));
1297        loop {
1298            let dispatched = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1299            let dropped = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1300            // The barrier: every event ingested pre-flush has been
1301            // either dispatched or dropped. The bus's invariant
1302            // `events_ingested = events_dispatched + events_dropped`
1303            // holds at quiescence; we wait until
1304            // `dispatched + dropped >= target_ingested`.
1305            //
1306            // Pre-fix this used `dispatched + (dropped -
1307            // dropped_at_start) >= target_ingested`, which under-
1308            // counted by `dropped_at_start`: even after every
1309            // pre-flush event was processed, the inequality
1310            // required `dropped_at_start` MORE post-flush events
1311            // before signalling done. Workloads with no post-flush
1312            // ingest hung at the barrier until the deadline fired.
1313            //
1314            // Cross-shard race remains: a fast shard's post-flush
1315            // dispatches can satisfy the global target while a
1316            // slow shard's pre-flush events linger in its mpsc
1317            // channel or pending batch. `all_shards_empty()`
1318            // checks ring buffers but not those downstream
1319            // queues. Operators relying on flush as a hard
1320            // delivery barrier should call it during quiet
1321            // ingest, or in `shutdown` (which gates ingest via
1322            // `try_enter_ingest`).
1323            let _ = dispatched_at_start; // reserved for future per-shard accounting
1324            if dispatched.saturating_add(dropped) >= target_ingested
1325                && self.shard_manager.all_shards_empty()
1326            {
1327                break;
1328            }
1329            if tokio::time::Instant::now() >= phase2_deadline {
1330                tracing::warn!(
1331                    target = target_ingested,
1332                    dispatched,
1333                    dropped = dropped.saturating_sub(dropped_at_start),
1334                    "flush: Phase 2 deadline reached before all ingested events were dispatched",
1335                );
1336                break;
1337            }
1338            tokio::time::sleep(poll_interval).await;
1339        }
1340
1341        // Phase 3: tell the adapter to flush whatever it has
1342        // buffered. Bounded by `adapter_timeout` so a hanging
1343        // adapter can't pin us forever.
1344        match tokio::time::timeout(self.config.adapter_timeout, self.adapter.flush()).await {
1345            Ok(r) => r,
1346            Err(_) => {
1347                tracing::warn!(
1348                    "flush: adapter.flush timed out after {:?}",
1349                    self.config.adapter_timeout
1350                );
1351                Err(AdapterError::Fatal("adapter flush timed out".into()))
1352            }
1353        }
1354    }
1355
1356    /// Gracefully shut down the event bus.
1357    ///
1358    /// The shutdown order is load-bearing:
1359    ///
1360    ///   1. Signal `shutdown` so drain workers stop pulling from
1361    ///      ring buffers after their final sweep.
1362    ///   2. Await **drain workers** so every event the producer
1363    ///      has handed to the bus is now in the per-shard mpsc
1364    ///      channel.
1365    ///   3. Drop `batch_senders` so each channel's last sender is
1366    ///      gone — the next `recv().await` in a batch worker will
1367    ///      return `None`.
1368    ///   4. Await **batch workers**, which drain everything
1369    ///      remaining in their channel and exit on `recv() = None`.
1370    ///
1371    /// Reversing steps 2 and 4 (the previous design) silently
1372    /// dropped events: a batch worker that exited on the shutdown
1373    /// flag could leave events the drain worker pushed *after* its
1374    /// `try_recv` sweep stranded in the channel.
1375    pub async fn shutdown(self) -> Result<(), AdapterError> {
1376        self.shutdown_via_ref().await
1377    }
1378
1379    /// Shutdown via shared reference — same semantics as
1380    /// [`shutdown`](Self::shutdown), but does not consume `self`.
1381    ///
1382    /// Useful for callers that hold the bus behind `Arc<EventBus>`
1383    /// (e.g., the SDK, where `subscribe` perpetuates an Arc clone
1384    /// into every `EventStream`) and therefore cannot satisfy
1385    /// `Arc::try_unwrap`. Idempotent: the first caller does the
1386    /// work; concurrent or subsequent callers wait for the
1387    /// `shutdown_completed` flag and return `Ok(())`.
1388    pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError> {
1389        // 1. CAS the shutdown flag false→true. SeqCst pairs with
1390        // `try_enter_ingest`'s shutdown check — any producer that
1391        // observed the previous `false` and is mid-push has its
1392        // `in_flight_ingests` increment ordered before this store
1393        // (the CAS-success branch is a release of the new `true`),
1394        // and so will be visible to the wait below.
1395        //
1396        // If the CAS loses (someone else — typically a concurrent
1397        // call or `Drop` — already flipped the flag), spin until
1398        // they finish. We can't run the rest of the body because
1399        // workers/senders may already be partially torn down.
1400        if self
1401            .shutdown
1402            .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
1403            .is_err()
1404        {
1405            // Bound the wait so a `Drop`-only path (which sets
1406            // `shutdown=true` but never sets `shutdown_completed`)
1407            // doesn't spin forever.
1408            //
1409            // Distinguish the two outcomes for callers. If
1410            // `shutdown_completed` flips inside the window, we
1411            // return `Ok(())` and the caller can be sure the first
1412            // caller finished. If the deadline fires first, we
1413            // surface `AdapterError::Transient(_)` — the bus IS
1414            // being shut down (the flag is set), but completion is
1415            // not yet observable; the caller can treat this as
1416            // "another thread is working on it, retry the
1417            // is_shutdown_completed() poll if you need a hard
1418            // barrier."
1419            //
1420            // Returning `Ok(())` in both branches would let
1421            // shutdown-done assumptions silently drift under a slow
1422            // adapter (`adapter_timeout` default 30 s > the 10 s
1423            // spin deadline), letting subsequent code observe a
1424            // partially-shut-down bus.
1425            // 10s in production builds; overridable via the
1426            // `_TEST_OVERRIDE_SHUTDOWN_VIA_REF_DEADLINE` thread-local
1427            // in test builds so the slow-first-caller test doesn't
1428            // need to wall-clock-wait for the full deadline. The
1429            // override is `#[cfg(test)]`-only; production cargo
1430            // builds compile out the override entirely.
1431            let deadline_dur = shutdown_via_ref_spin_deadline();
1432            // Use `tokio::time::Instant` so tests using
1433            // `tokio::time::pause()` virtualize this clock too.
1434            // Pre-fix `std::time::Instant` was wall-clock and
1435            // ignored `pause()`, breaking timeout-bounded tests
1436            // that wanted to fast-forward the spin deadline.
1437            let deadline = tokio::time::Instant::now() + deadline_dur;
1438            while !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1439                if tokio::time::Instant::now() >= deadline {
1440                    return Err(AdapterError::Transient(
1441                        "shutdown_via_ref: another caller is mid-shutdown; \
1442                         deadline elapsed before shutdown_completed \
1443                         flipped. The bus IS shutting down; poll \
1444                         is_shutdown_completed() if you need a hard \
1445                         barrier."
1446                            .into(),
1447                    ));
1448                }
1449                // `yield_now` re-queues immediately and keeps the
1450                // task hot, starving the workers we're waiting on
1451                // under contention. A short `sleep` parks the task
1452                // and lets the runtime schedule the workers.
1453                tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1454            }
1455            return Ok(());
1456        }
1457
1458        // 1a. Wait for in-flight ingests to drain BEFORE the drain
1459        // workers do their final ring-buffer sweep. Otherwise a
1460        // producer that observed `shutdown=false` could push *after*
1461        // the drain worker's last `pop_batch_into` returned zero,
1462        // leaving the event stranded in the ring buffer when the bus
1463        // is dropped.
1464        //
1465        // This is bounded: every producer either bails on the
1466        // SeqCst-synchronized shutdown check (no progress past the
1467        // increment) or completes its single non-blocking push and
1468        // decrements. Both paths take constant time; the total
1469        // wait is O(producer threads).
1470        //
1471        // The "every observed in-flight ingest completes before
1472        // the final sweep" property holds under normal conditions,
1473        // but the 5-second deadline below forces the gate open
1474        // even when producers are still in their push window. A
1475        // producer that has incremented `in_flight_ingests` (and
1476        // so observed `shutdown=false`) but whose push is delayed
1477        // past the deadline (heavy contention, debugger hit, etc.)
1478        // will complete its push AFTER the final sweep — its event
1479        // lands in the ring buffer and is never read. The deadline
1480        // exists so a stuck producer can't deadlock shutdown
1481        // indefinitely; the trade-off is documented data loss past
1482        // the 5 s window, surfaced via the `events_dropped` stat
1483        // (so the loss is observable to operators) and the `WARN`
1484        // log below (so it's diagnosable). The "no stranding"
1485        // promise on the happy path stands; the deadline path is
1486        // the documented escape hatch.
1487        // Use `tokio::time::Instant` so tests using
1488        // `tokio::time::pause()` virtualize this 5-second
1489        // deadline too — pre-fix the `std::time::Instant`
1490        // was wall-clock and ignored the test's paused clock.
1491        let in_flight_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1492        // Snapshot of `(stranded, ingested, dispatched)` at the
1493        // deadline — Some(...) iff we hit the deadline path. The
1494        // post-drain reconciliation reads this to compute the
1495        // actual drop count (see comment further down).
1496        let mut deadline_snapshot: Option<(u64, u64, u64)> = None;
1497        while self.in_flight_ingests.sum() > 0 {
1498            if tokio::time::Instant::now() >= in_flight_deadline {
1499                let stranded = self.in_flight_ingests.sum();
1500                let ingested_now = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1501                let dispatched_now = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1502                tracing::warn!(
1503                    in_flight = stranded,
1504                    lossy = true,
1505                    "shutdown timed out waiting for in-flight ingests after 5s; \
1506                     proceeding — up to {} events may strand in the ring buffer \
1507                     past final drain (documented data-loss path)",
1508                    stranded,
1509                );
1510                // Set the lossy flag immediately so a fast `is_*`
1511                // poll observes the outcome before the drain
1512                // finishes. The actual `events_dropped` bump is
1513                // deferred until after the final drain runs (see
1514                // "post-drain reconciliation" below) so we don't
1515                // double-count events that the drain still
1516                // successfully delivers — pre-fix this bumped
1517                // `events_dropped += stranded` here and the same
1518                // events that the final sweep then drained landed
1519                // in BOTH `events_ingested` and `events_dropped`,
1520                // breaking the bus's
1521                // `ingested == dispatched + dropped` invariant
1522                // and turning `shutdown_was_lossy` into a false
1523                // positive on every deadline-triggered shutdown.
1524                self.stats
1525                    .shutdown_was_lossy
1526                    .store(true, AtomicOrdering::Release);
1527                deadline_snapshot = Some((stranded, ingested_now, dispatched_now));
1528                break;
1529            }
1530            // Park instead of `yield_now`. The producers we're
1531            // waiting on contend for the same runtime threads;
1532            // re-queuing immediately starves their progress.
1533            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1534        }
1535
1536        // 1b. Release the drain-finalize gate.
1537        //
1538        // Pre-fix used `Ordering::Release` for this store
1539        // and relied on the SeqCst spin above (loading
1540        // `in_flight_ingests`) to provide the happens-before for
1541        // every observed-pre-shutdown push. That works today
1542        // because SeqCst loads carry an implicit fence, but a
1543        // future change to the spin's ordering (Relaxed for perf,
1544        // say) would silently break the drain worker's final-sweep
1545        // contract — producer pushes might not be visible to
1546        // `pop_batch_into`. Promote to SeqCst so the load-bearing
1547        // happens-before is explicit at this site, not derived
1548        // from another atomic's ordering choice.
1549        //
1550        // Caveat: the SeqCst happens-before above only covers the
1551        // *non-deadline* exit from the spin (the loop condition
1552        // observed `in_flight_ingests == 0`). On the deadline-break
1553        // path the loop exits with `in_flight_ingests > 0` — the
1554        // outstanding producer pushes have NOT been observed
1555        // synchronized-with this thread, and the SeqCst store below
1556        // does not retroactively create a happens-before with
1557        // pushes that are still mid-flight. Those events are
1558        // exactly the "stranded" events accounted via
1559        // `events_dropped` and `shutdown_was_lossy`; the contract
1560        // is "every observed-pre-shutdown push is visible to the
1561        // final sweep on the happy path; on the deadline path,
1562        // up to `stranded` events past the gate are surfaced as
1563        // dropped". The flag at line 1271 (`shutdown_was_lossy`)
1564        // is the operator-visible signal that this contract was
1565        // exercised on the lossy branch.
1566        self.drain_finalize_ready
1567            .store(true, AtomicOrdering::SeqCst);
1568
1569        // Stop the scaling monitor first — it's independent of the
1570        // ingestion path and just needs to observe the flag.
1571        let scaling_handle = self.scaling_monitor.lock().take();
1572        if let Some(handle) = scaling_handle {
1573            let _ = handle.await;
1574        }
1575
1576        // Take workers without holding the lock across await.
1577        let workers = std::mem::take(&mut *self.batch_workers.lock());
1578
1579        // 2. Await drain workers. Each one pops a final batch (up
1580        //    to 10k events) from its ring buffer, sends it on the
1581        //    channel, and exits. After this loop, every event in
1582        //    the ring buffers has been pushed to its channel.
1583        //
1584        // `join_all` lets the runtime overlap drain handles. A
1585        // sequential `for ... { drain.await; }` would serialize
1586        // shutdown wall-clock as N×T instead of max(T), which on
1587        // the default 1024-shard config × per-shard final-drain
1588        // time makes shutdown painful.
1589        let (drains, batch_handles): (Vec<_>, Vec<_>) = workers
1590            .into_iter()
1591            .map(|(shard_id, ShardWorkers { batch, drain, .. })| {
1592                ((shard_id, drain), (shard_id, batch))
1593            })
1594            .unzip();
1595
1596        // Surface drain-worker JoinErrors explicitly. The default
1597        // Tokio runtime does NOT log spawned-task panics, so a
1598        // `let _ = join_all(...)` would silently swallow a panic
1599        // and mask stranded events. tracing::error per failure
1600        // makes the incident grep-able post-mortem.
1601        let drain_handles: Vec<_> = drains.into_iter().map(|(_, h)| h).collect();
1602        let drain_ids: Vec<u16> = batch_handles.iter().map(|(id, _)| *id).collect();
1603        for (shard_id, result) in drain_ids
1604            .iter()
1605            .copied()
1606            .zip(futures::future::join_all(drain_handles).await)
1607        {
1608            if let Err(e) = result {
1609                tracing::error!(
1610                    shard_id,
1611                    error = %e,
1612                    "drain worker JoinHandle errored on shutdown await"
1613                );
1614            }
1615        }
1616
1617        // 3. Drop the original senders so the channels close once
1618        //    drain-worker sender clones (already dropped above)
1619        //    are gone too. Without this, batch workers would block
1620        //    on `recv().await` forever.
1621        drop(std::mem::take(&mut *self.batch_senders.write()));
1622
1623        // 4. Await batch workers. They drain their channel until
1624        //    `recv() = None`, flush, and exit.
1625        //
1626        // Same parallelization as the drain phase, with the same
1627        // explicit JoinError surfacing.
1628        let batch_only: Vec<_> = batch_handles.into_iter().map(|(_, h)| h).collect();
1629        for (shard_id, result) in drain_ids
1630            .into_iter()
1631            .zip(futures::future::join_all(batch_only).await)
1632        {
1633            if let Err(e) = result {
1634                tracing::error!(
1635                    shard_id,
1636                    error = %e,
1637                    "BatchWorker JoinHandle errored on shutdown await"
1638                );
1639            }
1640        }
1641
1642        // Flush and shutdown adapter (with timeout to prevent hanging)
1643        let timeout = self.config.adapter_timeout;
1644        if tokio::time::timeout(timeout, self.adapter.flush())
1645            .await
1646            .is_err()
1647        {
1648            tracing::error!("Adapter flush timed out during shutdown");
1649        }
1650        let result = tokio::time::timeout(timeout, self.adapter.shutdown())
1651            .await
1652            .map_err(|_| AdapterError::Fatal("adapter shutdown timed out".into()))?;
1653
1654        // Post-drain reconciliation for the lossy-shutdown path.
1655        //
1656        // If we hit the in-flight deadline above, `deadline_snapshot`
1657        // holds `(stranded, ingested@deadline, dispatched@deadline)`.
1658        // Some of those `stranded` producers' events landed in the
1659        // ring AFTER our deadline check but BEFORE the
1660        // `drain_finalize_ready` gate flipped — those events are
1661        // now successfully ingested, drained, and dispatched through
1662        // the adapter. They appear in `events_dispatched` (the
1663        // delta since the deadline), so:
1664        //
1665        //   actual_drops = stranded
1666        //                  - (dispatched_after_drain - dispatched@deadline)
1667        //                  - (ingested_after_drain - ingested@deadline)
1668        //                       only counting events that landed but
1669        //                       weren't dispatched (dropped under
1670        //                       backpressure, etc.)
1671        //
1672        // The cleaner reconciliation: events that completed
1673        // `try_enter_ingest` AFTER the deadline either completed
1674        // ingest (bumping `events_ingested`) or were dropped on
1675        // backpressure (bumping `events_dropped` from the existing
1676        // backpressure paths). The `stranded - delta_ingested`
1677        // remainder is producers whose `try_enter_ingest` succeeded
1678        // but never reached `shard_manager.ingest()` — those are
1679        // the genuinely-lost events we should account for.
1680        if let Some((stranded, ingested_at_deadline, _dispatched_at_deadline)) = deadline_snapshot {
1681            let ingested_after = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1682            let post_deadline_ingests = ingested_after.saturating_sub(ingested_at_deadline);
1683            // Known under-count window: a producer that completed
1684            // `shard_manager.ingest()` and bumped `events_ingested`
1685            // just BEFORE its `IngestGuard` decremented
1686            // `in_flight_ingests` will be counted in
1687            // `ingested_at_deadline` (already past the bump) AND in
1688            // `stranded` (still pending the decrement on that same
1689            // spin). That single event contributes `+1` to both
1690            // sides of the subtraction, so the saturating_sub
1691            // under-counts the drop by 1 per such interleaving.
1692            // The opposite direction (producer in-flight at
1693            // deadline that never completed ingest) is correctly
1694            // counted as a drop, so the bias is one-sided and
1695            // small (bounded by the number of producers mid-push
1696            // at the exact deadline moment — typically 0–1 even
1697            // under heavy load). Operators reading
1698            // `shutdown_was_lossy` get the right boolean; the
1699            // numeric `events_dropped` may under-count by a few
1700            // events on a lossy shutdown. Acceptable; the cost of
1701            // closing the window is paired-SeqCst reloads under
1702            // the spin which would dominate the shutdown path.
1703            let actual_drops = stranded.saturating_sub(post_deadline_ingests);
1704            if actual_drops > 0 {
1705                self.stats
1706                    .events_dropped
1707                    .fetch_add(actual_drops, AtomicOrdering::Relaxed);
1708            } else {
1709                // The deadline tripped the eager
1710                // `shutdown_was_lossy = true` set above, but the
1711                // final drain ingested every stranded event so
1712                // nothing was actually lost. Clear the flag so
1713                // operator dashboards alerting on
1714                // `was_lossy && events_dropped == 0` don't see a
1715                // false positive. Pre-fix the boolean stayed
1716                // `true` against `events_dropped == 0` for any
1717                // deadline-triggered shutdown whose drain
1718                // happened to catch up.
1719                self.stats
1720                    .shutdown_was_lossy
1721                    .store(false, AtomicOrdering::Release);
1722            }
1723            tracing::warn!(
1724                stranded_at_deadline = stranded,
1725                post_deadline_ingests,
1726                actual_drops,
1727                "lossy shutdown reconciled: post-drain `events_dropped` bumped \
1728                 by stranded - post-deadline-ingests (pre-fix this bumped by \
1729                 the full `stranded` count, double-counting events the drain \
1730                 still successfully delivered)",
1731            );
1732        }
1733
1734        // Mark shutdown as completed so Drop knows not to warn.
1735        self.shutdown_completed.store(true, AtomicOrdering::Release);
1736        result
1737    }
1738
1739    /// True once `shutdown` / `shutdown_via_ref` has signaled — does
1740    /// not imply the shutdown work has finished. Use
1741    /// [`is_shutdown_completed`](Self::is_shutdown_completed) for
1742    /// completion.
1743    pub fn is_shutdown(&self) -> bool {
1744        self.shutdown.load(AtomicOrdering::Acquire)
1745    }
1746
1747    /// True once `shutdown` / `shutdown_via_ref` has fully drained
1748    /// workers and the adapter shutdown returned (success path only).
1749    pub fn is_shutdown_completed(&self) -> bool {
1750        self.shutdown_completed.load(AtomicOrdering::Acquire)
1751    }
1752
1753    /// Get shard metrics (if dynamic scaling is enabled).
1754    pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>> {
1755        self.shard_manager.collect_metrics()
1756    }
1757
1758    /// Check if dynamic scaling is enabled.
1759    pub fn is_dynamic_scaling_enabled(&self) -> bool {
1760        self.config.scaling.is_some()
1761    }
1762
1763    /// Manually trigger a scale-up (for testing or manual intervention).
1764    ///
1765    /// Bypasses the auto-scaling cooldown so a deliberate operator
1766    /// request isn't rate-limited by the auto-scaling cadence.
1767    /// Pre-fix this looped `add_shard_internal()` N times, each
1768    /// of which bumped `last_scaling`, so iteration 1+ failed
1769    /// with `InCooldown` against any non-zero cooldown — the
1770    /// first shard was left half-added (workers spawned, routing
1771    /// entry installed) while the error propagated to the
1772    /// caller. The `max_shards` budget check still applies.
1773    pub async fn manual_scale_up(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1774        let mut new_ids = Vec::with_capacity(count as usize);
1775        for _ in 0..count {
1776            let id = self.add_shard_internal_force().await?;
1777            new_ids.push(id);
1778        }
1779        Ok(new_ids)
1780    }
1781
1782    /// Manually trigger a scale-down (for testing or manual intervention).
1783    ///
1784    /// Marks `count` shards as `Draining`, waits for them to empty,
1785    /// finalizes them to `Stopped`, and removes them from the
1786    /// routing table — mirroring the scaling monitor's per-tick
1787    /// finalize loop. Returns the IDs of shards that were
1788    /// successfully drained AND removed (subset of those marked
1789    /// Draining if any failed to empty within the deadline).
1790    ///
1791    /// Drives the full scale-down lifecycle synchronously: a
1792    /// plain `mapper.scale_down` call marks shards `Draining` but
1793    /// does NOT finalize them — finalization is the scaling
1794    /// monitor's responsibility. Bus configs without an active
1795    /// monitor (or callers that shut down before the monitor's
1796    /// next tick) would otherwise lose any events queued in those
1797    /// shards' ring buffers.
1798    pub async fn manual_scale_down(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1799        let mapper = self
1800            .shard_manager
1801            .mapper()
1802            .ok_or_else(|| AdapterError::Fatal("Dynamic scaling not enabled".into()))?;
1803
1804        let drained_ids = mapper
1805            .scale_down(count)
1806            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
1807
1808        // `finalize_draining` requires the shard to have been
1809        // Draining for >100ms with an empty ring buffer and no
1810        // pushes since drain start. Poll until every requested
1811        // shard finalizes, capped by an outer deadline so a wedged
1812        // producer can't pin this method forever. Use
1813        // `tokio::time::Instant` so tests under `tokio::time::pause()`
1814        // advance the virtual clock via `sleep` rather than spinning
1815        // until wall-clock catches up.
1816        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1817        let mut finalized: std::collections::HashSet<u16> = std::collections::HashSet::new();
1818        let target: std::collections::HashSet<u16> = drained_ids.iter().copied().collect();
1819
1820        while finalized.len() < target.len() && tokio::time::Instant::now() < deadline {
1821            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1822            let stopped = mapper.finalize_draining();
1823            // `finalize_draining` is destructive — every qualifying
1824            // Draining shard transitions to Stopped in one shot,
1825            // regardless of who initiated the drain. Pre-fix the
1826            // `if target.contains(&shard_id)` filter dropped non-
1827            // target ids on the floor; if the scaling monitor (or
1828            // a parallel `manual_scale_down` on a different target
1829            // set) finalized one of THEIR shards in the same tick,
1830            // that shard ended up Stopped with workers + routing
1831            // entry intact — leaked. Always tear down via
1832            // `remove_shard_internal` so the bus-side state
1833            // (workers, sender, routing) is freed; only count the
1834            // target subset toward the returned Vec.
1835            for shard_id in stopped {
1836                let _ = self.remove_shard_internal(shard_id).await;
1837                if target.contains(&shard_id) {
1838                    finalized.insert(shard_id);
1839                }
1840            }
1841        }
1842
1843        // Surface partial success at WARN level. The return shape
1844        // is preserved for compat (changing to `(Vec, Vec)` would
1845        // break the existing callers + test) so a smaller-than-
1846        // target list could otherwise be silently mistaken for full
1847        // success; the WARN log gives operations tooling something
1848        // to alert on.
1849        if finalized.len() < target.len() {
1850            let still_draining: Vec<u16> = target.difference(&finalized).copied().collect();
1851            tracing::warn!(
1852                requested = target.len(),
1853                finalized = finalized.len(),
1854                still_draining = ?still_draining,
1855                "manual_scale_down deadline elapsed before all targeted \
1856                 shards finalized — events still in-flight on the listed \
1857                 shards. They will finalize on the next scaling-monitor \
1858                 tick or on shutdown."
1859            );
1860        }
1861
1862        Ok(finalized.into_iter().collect())
1863    }
1864}
1865
1866impl Drop for EventBus {
1867    fn drop(&mut self) {
1868        // Signal shutdown so background tasks (drain workers, batch
1869        // workers, scaling monitor) observe the flag and exit. We
1870        // cannot await futures in Drop, but setting the atomic flag
1871        // triggers eventual termination.
1872        //
1873        // Previously used `Release` here while `try_enter_ingest`
1874        // and `shutdown()` use `SeqCst` on the same flag. `&mut self`
1875        // exclusion makes that sound today (no concurrent ingest can
1876        // observe a half-published shutdown). The mismatch is purely
1877        // defensive — switching to `SeqCst` matches the rest of the
1878        // lifecycle and removes a footgun if a future change ever
1879        // opened a path where `Drop` overlaps an in-flight
1880        // `try_enter_ingest`.
1881        self.shutdown.store(true, AtomicOrdering::SeqCst);
1882        // Also release the drain-finalize gate so any drain worker
1883        // already parked waiting for it can proceed and exit. Without
1884        // this, drop-without-shutdown leaves drain workers blocked on
1885        // `drain_finalize_ready` until their internal deadline fires
1886        // (which delays task cleanup by `DRAIN_FINALIZE_TIMEOUT`).
1887        // Best-effort durability: drop never gets the in-flight wait,
1888        // so any push that lands after this point is still lost.
1889        self.drain_finalize_ready
1890            .store(true, AtomicOrdering::SeqCst);
1891
1892        // Workers do NOT hold `Arc<EventBus>` — they hold
1893        // independent `Arc<ShardManager>` / `Arc<dyn Adapter>`
1894        // clones plus the channel halves. When `Drop` returns,
1895        // those Arcs survive in the still-running tasks and they
1896        // continue draining / dispatching until they observe the
1897        // shutdown flag we just set. There's no partial-Drop UB
1898        // risk: nothing on the worker side dereferences a
1899        // dropped EventBus field. The flags promote the
1900        // worker tasks from "blocked on recv / parked on
1901        // drain_finalize_ready" to "observe shutdown=true and
1902        // exit" so they don't linger indefinitely.
1903        //
1904        // If `shutdown()` was never awaited, any events still in the
1905        // per-shard ring buffers or mpsc channels are lost — the
1906        // adapter's `flush()` and `shutdown()` won't run, so durable
1907        // backends never see them. We can't fix that from `Drop` (no
1908        // async), but we *can* surface the data-loss risk loudly so
1909        // it doesn't hide. The check is bounded to "shutdown was
1910        // never started"; an in-progress shutdown is fine because the
1911        // call site is awaiting it.
1912        if !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1913            // Count events still sitting in shard ring buffers. They
1914            // are stranded — the drain workers will see `shutdown =
1915            // true` and exit without flushing, the adapter's
1916            // `flush()`/`shutdown()` never run, so anything in the
1917            // rings at this point is permanently lost. Surface that
1918            // loss via `events_dropped` so post-mortem stats reflect
1919            // reality (operators alerting on `events_dropped > 0`
1920            // would otherwise miss the entire incident), and set
1921            // `shutdown_was_lossy` so the boolean view is consistent
1922            // with the counter view.
1923            //
1924            // Events in the BatchWorker mpsc channels or pending
1925            // batches are not counted here — those workers may still
1926            // observe the shutdown flag and exit, but we have no
1927            // synchronous way from Drop to enumerate them. The ring-
1928            // buffer count is a lower bound on the stranded total.
1929            //
1930            // Use the non-blocking accessor: if `Drop` runs on a
1931            // thread that already holds a shard mutex (single-thread
1932            // runtime + panic during shutdown is the canonical
1933            // hazard), the blocking `total_pending_in_rings` would
1934            // self-deadlock. Best-effort accounting is the right
1935            // trade-off here — we'd rather under-report stranded
1936            // events than wedge the drop forever.
1937            let (stranded_in_rings, uncounted_shards) =
1938                self.shard_manager.try_total_pending_in_rings();
1939            if uncounted_shards > 0 {
1940                tracing::warn!(
1941                    uncounted_shards,
1942                    "EventBus::drop: {uncounted_shards} shard(s) were locked at \
1943                     drop time and could not be accounted for in stranded_in_rings"
1944                );
1945            }
1946            if stranded_in_rings > 0 {
1947                self.stats
1948                    .events_dropped
1949                    .fetch_add(stranded_in_rings, AtomicOrdering::Relaxed);
1950                self.stats
1951                    .shutdown_was_lossy
1952                    .store(true, AtomicOrdering::Release);
1953            }
1954
1955            let stats = self.shard_manager.stats();
1956            tracing::warn!(
1957                events_ingested = stats.events_ingested,
1958                events_dropped = stats.events_dropped,
1959                stranded_in_rings,
1960                "EventBus dropped without an awaited shutdown(). Any in-flight \
1961                 events still in the ring buffers or batch channels will be lost \
1962                 — the adapter's flush()/shutdown() never ran. Call \
1963                 `bus.shutdown().await` before dropping for durable shutdown."
1964            );
1965        }
1966    }
1967}
1968
1969/// Spin deadline for the second-caller path in
1970/// `shutdown_via_ref`. 10s in production.
1971#[cfg(not(test))]
1972fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1973    std::time::Duration::from_secs(10)
1974}
1975
1976/// Test-only override. Stored as milliseconds; `0` (the default)
1977/// means "use the production 10s". Set via
1978/// [`set_shutdown_via_ref_spin_deadline_for_test`] from inside a
1979/// test that needs to exercise the deadline-elapsed path without
1980/// wall-clock-waiting the full 10s.
1981///
1982/// This is a global atomic shared across all tests in the
1983/// `cargo test --lib` binary. If two tests touched it concurrently,
1984/// one's override would leak into the other's expectations. Tests
1985/// that use the override MUST take the
1986/// [`SHUTDOWN_DEADLINE_OVERRIDE_GUARD`] mutex for the duration of
1987/// their override-setter / read window — see
1988/// [`set_shutdown_via_ref_spin_deadline_for_test`].
1989#[cfg(test)]
1990static SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS: std::sync::atomic::AtomicU64 =
1991    std::sync::atomic::AtomicU64::new(0);
1992
1993/// Serializes access to the deadline override so concurrent tests
1994/// can't observe each other's transient values. Tests that override
1995/// the deadline take this mutex via
1996/// [`shutdown_deadline_override_lock`] and hold the guard until
1997/// they reset the override to 0.
1998///
1999/// Uses `tokio::sync::Mutex` rather than `std::sync::Mutex` so
2000/// the guard can legitimately be held across `.await` points
2001/// while the guarded test runs (clippy::await_holding_lock would
2002/// otherwise fire on the std variant — and rightly so for
2003/// production code).
2004#[cfg(test)]
2005static SHUTDOWN_DEADLINE_OVERRIDE_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
2006
2007/// Acquire the guard mutex protecting the deadline-override
2008/// static. Tests touching the override hold the returned guard
2009/// across both the set and the reset, so concurrent tests
2010/// observe a consistent default.
2011#[cfg(test)]
2012pub(crate) async fn shutdown_deadline_override_lock() -> tokio::sync::MutexGuard<'static, ()> {
2013    SHUTDOWN_DEADLINE_OVERRIDE_GUARD.lock().await
2014}
2015
2016#[cfg(test)]
2017fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
2018    let ms = SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.load(std::sync::atomic::Ordering::Relaxed);
2019    if ms == 0 {
2020        std::time::Duration::from_secs(10)
2021    } else {
2022        std::time::Duration::from_millis(ms)
2023    }
2024}
2025
2026#[cfg(test)]
2027pub(crate) fn set_shutdown_via_ref_spin_deadline_for_test(ms: u64) {
2028    SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
2029}
2030
2031#[cfg(test)]
2032impl EventBus {
2033    /// Test-only: bump `in_flight_ingests` by `n` to simulate
2034    /// stranded producers mid-call to `shard_manager.ingest`.
2035    /// The shutdown spin loop reads `in_flight_ingests` to decide
2036    /// whether work is still pending; the lossy-shutdown
2037    /// reconciliation tests need to drive that counter without
2038    /// the matching real `try_enter_ingest` / `ingest_complete`
2039    /// machinery, which would also bump `events_ingested` and
2040    /// confuse the (stranded, post_deadline_ingests) accounting.
2041    ///
2042    /// Tests own the lifecycle: every `stage_stranded_ingest(n)`
2043    /// MUST be paired with `release_stranded_ingest(n)` before
2044    /// the bus drops, otherwise the `Drop` impl's invariants
2045    /// fire.
2046    pub(crate) fn stage_stranded_ingest(&self, n: u64) {
2047        // Stripe-aware: tests don't care which slot the staged
2048        // count lands in; slot 0 is fine and `release_*` /
2049        // `complete_*` sub from the same slot for net-zero
2050        // accounting. The slot 0 placement is independent of any
2051        // real producer's `slot_for_current_thread()` selection,
2052        // which would (post-fix) most likely hit a different slot
2053        // — that's also fine, the sum is what `shutdown` reads.
2054        self.in_flight_ingests.fetch_add_at(0, n);
2055    }
2056
2057    /// Test-only: counterpart to [`stage_stranded_ingest`].
2058    /// Decrements `in_flight_ingests` by `n` without bumping
2059    /// `events_ingested`. Use when simulating producers that
2060    /// were stranded by the shutdown deadline and never
2061    /// completed (the drain reconciliation should classify them
2062    /// as drops).
2063    pub(crate) fn release_stranded_ingest(&self, n: u64) {
2064        // Mirror of `stage_stranded_ingest`: sub from slot 0.
2065        self.in_flight_ingests.fetch_sub_at(0, n);
2066    }
2067
2068    /// Test-only: simulate a previously-stranded producer
2069    /// completing AFTER the shutdown deadline. Bumps
2070    /// `events_ingested` first (Release ordering — the
2071    /// post-drain reconciliation reads it with Acquire), then
2072    /// releases the in-flight slot. The order matches what a
2073    /// real producer's `ingest` call site does, so the
2074    /// reconciliation's (ingested_after - ingested_at_deadline)
2075    /// arithmetic reads the same way.
2076    pub(crate) fn complete_stranded_ingest(&self, n: u64) {
2077        self.stats
2078            .events_ingested
2079            .fetch_add(n, std::sync::atomic::Ordering::Release);
2080        // Mirror of `stage_stranded_ingest`: sub from slot 0.
2081        self.in_flight_ingests.fetch_sub_at(0, n);
2082    }
2083}
2084
2085async fn run_scaling_monitor_via_weak(weak: std::sync::Weak<EventBus>) {
2086    // Refresh `interval` from the policy on every tick. The previous
2087    // version cached it once at task start, so any future runtime
2088    // policy update would not be adopted by the monitor without a
2089    // process restart. Today `EventBusConfig` is immutable
2090    // post-construction so this is a no-op — but reading it each tick
2091    // is cheap (one atomic / RwLock read) and removes the latent
2092    // footgun.
2093    loop {
2094        let interval = match weak.upgrade() {
2095            Some(bus) => match &bus.config.scaling {
2096                Some(p) => p.metrics_window,
2097                None => return,
2098            },
2099            None => return,
2100        };
2101        tokio::time::sleep(interval).await;
2102
2103        let bus = match weak.upgrade() {
2104            Some(b) => b,
2105            // Last strong ref dropped — caller is shutting down (or
2106            // already gone). Exit cleanly.
2107            None => break,
2108        };
2109
2110        // SeqCst to match the writer side (`EventBus::shutdown` /
2111        // `Drop`). The Acquire/Release handshake on
2112        // `drain_finalize_ready` already provides the load-bearing
2113        // happens-before today — but a future code change that
2114        // piggybacks on `shutdown`'s ordering (e.g. a producer that
2115        // observes shutdown without going through
2116        // `try_enter_ingest`) would silently break under Relaxed.
2117        // Aligning the read-side ordering with the writer-side
2118        // SeqCst is a one-instruction tax for the safety.
2119        if bus.shutdown.load(AtomicOrdering::SeqCst) {
2120            break;
2121        }
2122
2123        // Collect metrics for observability.
2124        if let Some(metrics) = bus.shard_manager.collect_metrics() {
2125            for m in &metrics {
2126                if m.fill_ratio > 0.5 {
2127                    tracing::debug!(
2128                        shard_id = m.shard_id,
2129                        fill_ratio = m.fill_ratio,
2130                        event_rate = m.event_rate,
2131                        "Shard metrics"
2132                    );
2133                }
2134            }
2135        }
2136
2137        // Evaluate scaling.
2138        match bus.shard_manager.evaluate_scaling() {
2139            ScalingDecision::ScaleUp(count) => {
2140                tracing::info!(count = count, "Scaling up shards");
2141                for _ in 0..count {
2142                    if let Err(e) = bus.add_shard_internal().await {
2143                        tracing::error!(error = %e, "Failed to add shard");
2144                        break;
2145                    }
2146                }
2147            }
2148            ScalingDecision::ScaleDown(count) => {
2149                tracing::info!(count = count, "Scaling down shards");
2150                if let Some(mapper) = bus.shard_manager.mapper() {
2151                    if let Ok(drained) = mapper.scale_down(count) {
2152                        for shard_id in drained {
2153                            let _ = bus.shard_manager.drain_shard(shard_id);
2154                        }
2155                    }
2156                }
2157            }
2158            ScalingDecision::None => {}
2159        }
2160
2161        if let Some(mapper) = bus.shard_manager.mapper() {
2162            let stopped = mapper.finalize_draining();
2163            for shard_id in stopped {
2164                let _ = bus.remove_shard_internal(shard_id).await;
2165            }
2166        }
2167
2168        // CRITICAL: drop the strong ref BEFORE the next sleep so a
2169        // concurrent `shutdown(self)` caller can `Arc::try_unwrap`
2170        // the last strong ref while we're sleeping.
2171        drop(bus);
2172    }
2173}
2174
2175/// Spawn a batch worker for a shard.
2176/// Dispatch a batch to the adapter with timeout and optional retries.
2177/// Returns true if the batch was accepted, false if all attempts failed.
2178///
2179/// Non-retryable errors (e.g. `AdapterError::Connection`,
2180/// `AdapterError::Fatal`, `AdapterError::Serialization`) skip the
2181/// retry loop and drop the batch immediately. Retrying a fatal error
2182/// just delays the inevitable while burning CPU and amplifying log
2183/// noise. Use `AdapterError::is_retryable` as the single source of
2184/// truth for this decision.
2185/// Compute the per-attempt backoff for `dispatch_batch` retries.
2186///
2187/// Pre-fix the retry loop slept a flat `Duration::from_millis(100)`
2188/// after every failure. Under a partial backend outage (Redis /
2189/// JetStream slow but not dead), every shard's BatchWorker retried
2190/// on the exact same 100 ms cadence, producing a synchronized
2191/// retry storm that amplified load while the backend was
2192/// recovering.
2193///
2194/// Post-fix: exponential backoff (100, 200, 400, 800, 1600, 3200 ms)
2195/// with per-(shard, attempt) jitter to decorrelate retries across
2196/// shards. Capped at attempt=5 (3.2 s base) so retries don't grow
2197/// unboundedly. Jitter is `[-25%, +25%]` of the base, derived from
2198/// hashing `(shard_id, attempt)` — deterministic per shard but
2199/// uncorrelated across shards, which is exactly what the storm
2200/// mitigation needs.
2201fn retry_backoff(shard_id: u16, attempt: u32) -> Duration {
2202    use std::collections::hash_map::DefaultHasher;
2203    use std::hash::{Hash, Hasher};
2204
2205    // 100 ms × 2^attempt, capped at attempt=5 → 100/200/400/800/1600/3200.
2206    let base_ms: u64 = 100u64.saturating_mul(1u64 << attempt.min(5));
2207
2208    let mut hasher = DefaultHasher::new();
2209    shard_id.hash(&mut hasher);
2210    attempt.hash(&mut hasher);
2211    let h = hasher.finish();
2212    // Jitter in [0, base_ms/2), centered to give [-25%, +25%].
2213    let jitter_range = (base_ms / 2).max(1);
2214    let jitter = (h % jitter_range) as i64 - (jitter_range as i64 / 2);
2215    let final_ms = (base_ms as i64 + jitter).max(1) as u64;
2216
2217    Duration::from_millis(final_ms)
2218}
2219
2220async fn dispatch_batch(
2221    adapter: &dyn Adapter,
2222    batch: Arc<Batch>,
2223    shard_id: u16,
2224    timeout: Duration,
2225    retries: u32,
2226) -> bool {
2227    // Retry attempts clone the `Arc` (refcount bump only); the final
2228    // attempt moves it. Pre-fix the batch was `Batch` and every retry
2229    // deep-cloned the events `Vec` — at the common `retries == 0`
2230    // path this was a wasted N×Bytes refcount bump + Vec alloc on
2231    // every batch.
2232    for attempt in 0..retries {
2233        match tokio::time::timeout(timeout, adapter.on_batch(Arc::clone(&batch))).await {
2234            Ok(Ok(())) => return true,
2235            Ok(Err(e)) => {
2236                if !e.is_retryable() {
2237                    // Tag with a `reason` field so this
2238                    // distinct drop cause is separately filterable
2239                    // from retry-exhausted and timeout in
2240                    // observability tools. Shutdown is distinguished
2241                    // from generic non-retryable so an operator
2242                    // chasing "why are batches being dropped" can
2243                    // immediately tell a sequencing bug (sending to
2244                    // a stopped adapter) from a transport / config
2245                    // failure.
2246                    let reason = if e.is_shutdown() {
2247                        "adapter_shutdown"
2248                    } else {
2249                        "non_retryable"
2250                    };
2251                    tracing::error!(
2252                        shard_id,
2253                        error = %e,
2254                        attempt,
2255                        reason,
2256                        "Non-retryable error from adapter, dropping batch"
2257                    );
2258                    return false;
2259                }
2260                tracing::warn!(shard_id, error = %e, attempt, "Batch dispatch failed, retrying");
2261            }
2262            Err(_) => {
2263                tracing::warn!(shard_id, attempt, "Adapter on_batch timed out, retrying");
2264            }
2265        }
2266        tokio::time::sleep(retry_backoff(shard_id, attempt)).await;
2267    }
2268
2269    // Pre-fix the final attempt collapsed every drop into
2270    // one log line ("Failed to dispatch batch, dropping"), making
2271    // it impossible to tell retry-exhausted from fatal-non-
2272    // retryable from timeout in metrics. The non-retryable case
2273    // already has its own log inside the retry loop above (early
2274    // return); here we tag retry-exhausted vs timeout-after-
2275    // retries with distinct `reason` fields so log-based
2276    // observability tools can break the drops out by cause.
2277    match tokio::time::timeout(timeout, adapter.on_batch(batch)).await {
2278        Ok(Ok(())) => true,
2279        Ok(Err(e)) => {
2280            tracing::error!(
2281                shard_id,
2282                error = %e,
2283                reason = "retry_exhausted",
2284                attempts = retries + 1,
2285                "Failed to dispatch batch after exhausting retries, dropping"
2286            );
2287            false
2288        }
2289        Err(_) => {
2290            tracing::error!(
2291                shard_id,
2292                reason = "timeout",
2293                attempts = retries + 1,
2294                "Adapter on_batch timed out on final attempt, dropping batch"
2295            );
2296            false
2297        }
2298    }
2299}
2300
2301struct BatchWorkerParams {
2302    shard_id: u16,
2303    rx: mpsc::Receiver<Vec<crate::event::InternalEvent>>,
2304    adapter: Arc<dyn Adapter>,
2305    shard_manager: Arc<ShardManager>,
2306    config: BatchConfig,
2307    adapter_timeout: Duration,
2308    batch_retries: u32,
2309    /// Bus-owned mirror of `BatchWorker::next_sequence`. The worker
2310    /// stores its post-flush sequence here on every dispatch so the
2311    /// bus can read it after the worker exits — see
2312    /// `ShardWorkers::next_sequence` for the consumer side.
2313    next_sequence: Arc<AtomicU64>,
2314    /// Bus-level stats. The worker increments
2315    /// `batches_dispatched` and `events_dispatched` after every
2316    /// successful `dispatch_batch`. Both must actually be
2317    /// incremented here, otherwise `flush()`'s Phase 2 progress
2318    /// probe would always observe zero progress and early-break
2319    /// after a single `max_delay` window — racing the
2320    /// BatchWorker's first `recv_timeout` and flaking on
2321    /// Windows-class timer resolution.
2322    stats: Arc<EventBusStats>,
2323    /// Producer nonce stamped on every batch the worker emits.
2324    /// Bus-loaded from the persistent path when
2325    /// `producer_nonce_path` is configured, otherwise from the
2326    /// per-process default.
2327    producer_nonce: u64,
2328}
2329
2330fn spawn_batch_worker(params: BatchWorkerParams) -> JoinHandle<()> {
2331    let BatchWorkerParams {
2332        shard_id,
2333        mut rx,
2334        adapter,
2335        shard_manager,
2336        config,
2337        adapter_timeout,
2338        batch_retries,
2339        next_sequence,
2340        stats,
2341        producer_nonce,
2342    } = params;
2343    tokio::spawn(async move {
2344        let mut worker = BatchWorker::new(shard_id, config.clone(), next_sequence, producer_nonce);
2345
2346        loop {
2347            // Wait for events with timeout. The batch worker exits
2348            // only when its channel is closed — i.e. after every
2349            // upstream sender (the drain worker for this shard +
2350            // `EventBus::batch_senders`) has been dropped.
2351            // `EventBus::shutdown` enforces that ordering so no
2352            // event is left stranded in the channel.
2353            let recv_timeout = worker.time_until_timeout().unwrap_or(config.max_delay);
2354
2355            match tokio::time::timeout(recv_timeout, rx.recv()).await {
2356                Ok(Some(events)) => {
2357                    if let Some(batch) = worker.add_events(events) {
2358                        let batch_len = batch.len() as u64;
2359                        if dispatch_batch(
2360                            &*adapter,
2361                            Arc::new(batch),
2362                            shard_id,
2363                            adapter_timeout,
2364                            batch_retries,
2365                        )
2366                        .await
2367                        {
2368                            stats
2369                                .batches_dispatched
2370                                .fetch_add(1, AtomicOrdering::Relaxed);
2371                            stats
2372                                .events_dispatched
2373                                .fetch_add(batch_len, AtomicOrdering::Relaxed);
2374                            // PERF_AUDIT §1.4 — lock-free counter
2375                            // bump on the per-shard `ShardCounters`
2376                            // (parallel `Vec` exposed by
2377                            // `ShardManager`). Pre-fix this
2378                            // `shard_ref.lock().record_batch_dispatch()`
2379                            // took the producer-hot shard mutex just
2380                            // to bump an atomic, injecting periodic
2381                            // latency spikes into the ingest path.
2382                            shard_manager.record_batch_dispatch(shard_id);
2383                        }
2384                    }
2385                }
2386                Ok(None) => {
2387                    // Channel closed — drain any pending and exit.
2388                    if worker.has_pending() {
2389                        let batch = worker.flush();
2390                        if !batch.is_empty() {
2391                            let batch_len = batch.len() as u64;
2392                            if dispatch_batch(
2393                                &*adapter,
2394                                Arc::new(batch),
2395                                shard_id,
2396                                adapter_timeout,
2397                                batch_retries,
2398                            )
2399                            .await
2400                            {
2401                                stats
2402                                    .batches_dispatched
2403                                    .fetch_add(1, AtomicOrdering::Relaxed);
2404                                stats
2405                                    .events_dispatched
2406                                    .fetch_add(batch_len, AtomicOrdering::Relaxed);
2407                            }
2408                        }
2409                    }
2410                    break;
2411                }
2412                Err(_) => {
2413                    // Timeout - check if we need to flush.
2414                    // Pre-fix [perf #38] called `worker.add_events(vec![])`
2415                    // as the signal, allocating an empty `Vec` per
2416                    // timeout tick. `check_timeout` is the direct
2417                    // expression of intent — no alloc.
2418                    if let Some(batch) = worker.check_timeout() {
2419                        let batch_len = batch.len() as u64;
2420                        if dispatch_batch(
2421                            &*adapter,
2422                            Arc::new(batch),
2423                            shard_id,
2424                            adapter_timeout,
2425                            batch_retries,
2426                        )
2427                        .await
2428                        {
2429                            stats
2430                                .batches_dispatched
2431                                .fetch_add(1, AtomicOrdering::Relaxed);
2432                            stats
2433                                .events_dispatched
2434                                .fetch_add(batch_len, AtomicOrdering::Relaxed);
2435                            // PERF_AUDIT §1.4 — lock-free counter
2436                            // bump on the per-shard `ShardCounters`
2437                            // (parallel `Vec` exposed by
2438                            // `ShardManager`). Pre-fix this
2439                            // `shard_ref.lock().record_batch_dispatch()`
2440                            // took the producer-hot shard mutex just
2441                            // to bump an atomic, injecting periodic
2442                            // latency spikes into the ingest path.
2443                            shard_manager.record_batch_dispatch(shard_id);
2444                        }
2445                    }
2446                }
2447            }
2448        }
2449    })
2450}
2451
2452/// Maximum time a drain worker waits for `drain_finalize_ready`
2453/// after observing `shutdown=true`. Defense in depth: if a caller
2454/// drops the bus mid-shutdown without setting the gate, we don't
2455/// want the worker pinned forever. The shutdown path *always* sets
2456/// the gate (even on its own timeout), so this deadline is normally
2457/// unreached.
2458const DRAIN_FINALIZE_TIMEOUT: Duration = Duration::from_secs(10);
2459
2460/// Spawn a drain worker for a single shard.
2461///
2462/// Uses a scratch `Vec` + `pop_batch_into` so the per-cycle
2463/// allocation happens *outside* the shard mutex critical section.
2464/// Each cycle: lock → drain into scratch (no alloc, capacity already
2465/// reserved) → unlock → `mem::replace` swaps the filled scratch out
2466/// for a fresh empty `Vec` (alloc *outside* the lock) → send the
2467/// filled batch on the channel.
2468fn spawn_drain_worker_for_shard(
2469    shard_id: u16,
2470    shard_manager: Arc<ShardManager>,
2471    sender: mpsc::Sender<Vec<crate::event::InternalEvent>>,
2472    shutdown: Arc<AtomicBool>,
2473    drain_finalize_ready: Arc<AtomicBool>,
2474) -> JoinHandle<()> {
2475    const STEADY_BATCH: usize = 1_000;
2476    const FINAL_BATCH: usize = 10_000;
2477
2478    tokio::spawn(async move {
2479        let mut scratch: Vec<crate::event::InternalEvent> = Vec::with_capacity(STEADY_BATCH);
2480
2481        loop {
2482            // SeqCst to match the writer side (`EventBus::shutdown` /
2483            // `Drop`). `try_enter_ingest` itself uses SeqCst, and
2484            // the Acquire/Release handshake on
2485            // `drain_finalize_ready` (below) is what actually makes
2486            // the producer-push happen-before visible. Aligning to
2487            // SeqCst here makes the contract robust to future
2488            // producer-side changes that might piggyback on
2489            // `shutdown`'s ordering.
2490            if shutdown.load(AtomicOrdering::SeqCst) {
2491                // Before doing the final sweep, wait for `shutdown()`
2492                // to release the finalize gate. The gate is set only
2493                // after the in-flight ingest counter reaches zero,
2494                // which means every producer that read `shutdown=false`
2495                // has completed its push. Without this wait, the drain
2496                // worker can race ahead of a late push under
2497                // shard-mutex serialization (drain takes the lock
2498                // first, sees nothing, exits; producer then takes the
2499                // lock and pushes — event stranded).
2500                //
2501                // Acquire pairs with the Release in `EventBus::shutdown`
2502                // and `EventBus::drop`, transitively making every
2503                // producer push that happened-before its `in_flight`
2504                // decrement visible to the subsequent `pop_batch_into`.
2505                // `tokio::time::Instant` so virtualized-clock tests
2506                // (`tokio::time::pause`) advance the deadline via
2507                // `sleep` rather than spinning until wall-clock catches
2508                // up.
2509                let finalize_deadline = tokio::time::Instant::now() + DRAIN_FINALIZE_TIMEOUT;
2510                while !drain_finalize_ready.load(AtomicOrdering::Acquire) {
2511                    if tokio::time::Instant::now() >= finalize_deadline {
2512                        tracing::warn!(
2513                            shard_id,
2514                            "drain worker timed out waiting for finalize gate; \
2515                             proceeding with potential event loss"
2516                        );
2517                        break;
2518                    }
2519                    // Park instead of `yield_now` so we don't
2520                    // starve the workers / producers we're waiting
2521                    // on under contention.
2522                    tokio::time::sleep(std::time::Duration::from_millis(1)).await;
2523                }
2524
2525                // Final drain: loop until the ring buffer is empty.
2526                // A single 10k batch is not enough — the ring
2527                // buffer can hold up to `ring_buffer_capacity`
2528                // events (default 1M) and any leftover would be
2529                // silently lost on shutdown.
2530                //
2531                // Pre-fix this broke at the first
2532                // `popped == 0`. The audit posited a narrow race
2533                // where a producer that fetch_add'd
2534                // in_flight_ingests but stalled before the
2535                // shard-lock body could push AFTER shutdown
2536                // observed in_flight=0 yet BEFORE this final
2537                // sweep saw the event. The SeqCst guard pattern
2538                // makes this unlikely (the push happens-before
2539                // the guard drop), but the defense is cheap:
2540                // require TWO consecutive zero-event passes
2541                // before declaring drain. The yield_now between
2542                // them gives a stalled producer a chance to land
2543                // the push.
2544                let mut final_scratch: Vec<crate::event::InternalEvent> =
2545                    Vec::with_capacity(FINAL_BATCH);
2546                let mut consecutive_zeros = 0u32;
2547                loop {
2548                    let popped = shard_manager
2549                        .with_shard(shard_id, |shard| {
2550                            shard.pop_batch_into(&mut final_scratch, FINAL_BATCH)
2551                        })
2552                        .unwrap_or(0);
2553                    if popped == 0 {
2554                        consecutive_zeros += 1;
2555                        if consecutive_zeros >= 2 {
2556                            break;
2557                        }
2558                        // Yield to let any racing producer commit
2559                        // its push, then re-poll.
2560                        tokio::task::yield_now().await;
2561                        continue;
2562                    }
2563                    consecutive_zeros = 0;
2564                    let batch =
2565                        std::mem::replace(&mut final_scratch, Vec::with_capacity(FINAL_BATCH));
2566                    let batch_len = batch.len();
2567                    if let Err(_send_err) = sender.send(batch).await {
2568                        // Batch worker exited before drain. The
2569                        // `mem::replace` already pulled events out
2570                        // of the ring buffer, so the dropped batch
2571                        // is unrecoverable — the SendError carries
2572                        // it back but the consumer is gone. Surface
2573                        // the count loudly so the loss is
2574                        // observable in operator dashboards rather
2575                        // than a silent miss in shutdown stats.
2576                        tracing::error!(
2577                            shard_id,
2578                            dropped = batch_len,
2579                            "drain worker (final): batch worker dropped \
2580                             channel before final drain completed; \
2581                             events removed from ring buffer cannot be redelivered",
2582                        );
2583                        break;
2584                    }
2585                }
2586                break;
2587            }
2588
2589            // Drain events from ring buffer.
2590            let popped = shard_manager.with_shard(shard_id, |shard| {
2591                shard.pop_batch_into(&mut scratch, STEADY_BATCH)
2592            });
2593
2594            match popped {
2595                Some(0) => {
2596                    // No events — yield briefly. The 100μs sleep is deliberate:
2597                    // this is a latency-first system where the drain loop is the
2598                    // hot path. Longer backoff would add milliseconds of latency
2599                    // to the first event after a quiet period, violating the
2600                    // sub-microsecond design target. The CPU cost of 100μs polling
2601                    // is acceptable for a system that processes 10M+ events/sec.
2602                    tokio::time::sleep(Duration::from_micros(100)).await;
2603                }
2604                Some(_) => {
2605                    let batch = std::mem::replace(&mut scratch, Vec::with_capacity(STEADY_BATCH));
2606                    let batch_len = batch.len();
2607                    if let Err(_send_err) = sender.send(batch).await {
2608                        // Steady-state: the only way the batch
2609                        // worker drops the channel is if it
2610                        // panicked or `remove_shard_internal`
2611                        // tore it down out of order with the
2612                        // drain worker (which the documented
2613                        // shutdown sequence forbids). Either way,
2614                        // the events are unrecoverable — the
2615                        // `mem::replace` above already pulled them
2616                        // out of the ring buffer. Pre-fix this
2617                        // simply `break`-d, leaving the loss
2618                        // invisible. Surface a loud error with
2619                        // the dropped count so an out-of-order
2620                        // shutdown or batch-worker panic shows up
2621                        // in dashboards rather than as a silent
2622                        // metric gap.
2623                        tracing::error!(
2624                            shard_id,
2625                            dropped = batch_len,
2626                            "drain worker: batch worker dropped channel \
2627                             during steady-state drain; events removed from \
2628                             ring buffer cannot be redelivered",
2629                        );
2630                        break;
2631                    }
2632                }
2633                None => {
2634                    // Shard no longer exists (was removed)
2635                    break;
2636                }
2637            }
2638        }
2639    })
2640}
2641
2642#[cfg(test)]
2643mod tests {
2644    use super::*;
2645    use crate::shard::ScalingPolicy;
2646    use serde_json::json;
2647
2648    /// PERF_AUDIT §1.1 — striped counter sums correctly across
2649    /// concurrent producers. Spawns 16 threads, each does 10_000
2650    /// add+sub on its sticky slot. After joins the sum must be 0
2651    /// (every add was paired with a sub), and slots must NOT all
2652    /// be 0 mid-flight (proving the stripe was actually used —
2653    /// not collapsed onto one slot by accident).
2654    #[test]
2655    fn striped_in_flight_sum_across_concurrent_producers() {
2656        let counter = std::sync::Arc::new(StripedInFlight::new());
2657        let mut handles = Vec::new();
2658        for _ in 0..16 {
2659            let c = counter.clone();
2660            handles.push(std::thread::spawn(move || {
2661                let slot = c.slot_for_current_thread();
2662                for _ in 0..10_000 {
2663                    c.fetch_add_at(slot, 1);
2664                    c.fetch_sub_at(slot, 1);
2665                }
2666            }));
2667        }
2668        for h in handles {
2669            h.join().unwrap();
2670        }
2671        assert_eq!(
2672            counter.sum(),
2673            0,
2674            "every add was paired with a sub: net sum must be zero"
2675        );
2676    }
2677
2678    /// PERF_AUDIT §1.1 — the stripe's accounting contract, pinned
2679    /// deterministically: every slot accumulates independently and
2680    /// `sum()` aggregates all of them. A regression collapsing the
2681    /// stripe onto one shared slot (the pre-§1.1 shape) aliases the
2682    /// per-slot counts and fails the per-slot assertions below.
2683    ///
2684    /// Deliberately does NOT assert how many distinct slots a set
2685    /// of threads lands on: the thread→slot mapping hashes
2686    /// `ThreadId`s, whose distribution std does not guarantee, so a
2687    /// "N distinct slots populated" assertion is nondeterministic
2688    /// (it can fail under legitimate thread-id collisions). The
2689    /// mapping's actual contract — in-bounds and sticky per thread
2690    /// — is asserted separately below.
2691    #[test]
2692    fn striped_in_flight_slots_account_independently() {
2693        let counter = StripedInFlight::new();
2694        // Distinct count per slot so aliasing between any two slots
2695        // changes at least one per-slot readback.
2696        for slot in 0..IN_FLIGHT_STRIPE_SLOTS {
2697            counter.fetch_add_at(slot, slot as u64 + 1);
2698        }
2699        let expected_total: u64 = (1..=IN_FLIGHT_STRIPE_SLOTS as u64).sum();
2700        assert_eq!(counter.sum(), expected_total, "sum must cover every slot");
2701        for slot in 0..IN_FLIGHT_STRIPE_SLOTS {
2702            assert_eq!(
2703                counter.slots[slot].load(AtomicOrdering::SeqCst),
2704                slot as u64 + 1,
2705                "slot {slot} must hold exactly its own count (no aliasing)"
2706            );
2707        }
2708    }
2709
2710    /// PERF_AUDIT §1.1 — thread→slot mapping contract: in-bounds
2711    /// for every thread, and sticky (repeat calls from the same
2712    /// thread return the same slot, so a producer's add and its
2713    /// guard's sub land on the same cache line).
2714    #[test]
2715    fn striped_in_flight_slot_mapping_is_in_bounds_and_sticky() {
2716        let counter = std::sync::Arc::new(StripedInFlight::new());
2717        let mut handles = Vec::new();
2718        for _ in 0..16 {
2719            let c = counter.clone();
2720            handles.push(std::thread::spawn(move || {
2721                let first = c.slot_for_current_thread();
2722                let second = c.slot_for_current_thread();
2723                assert!(first < IN_FLIGHT_STRIPE_SLOTS, "slot out of bounds");
2724                assert_eq!(first, second, "slot must be sticky per thread");
2725            }));
2726        }
2727        for h in handles {
2728            h.join().unwrap();
2729        }
2730    }
2731
2732    /// PERF_AUDIT §1.1 — the shutdown gate must survive the
2733    /// striping. After `shutdown_via_ref` completes, `ingest` must
2734    /// be rejected with `ShuttingDown`, AND the rejected attempt
2735    /// must leave the striped sum at exactly zero — the gate's
2736    /// add-check-sub sequence nets out on the producer's slot. A
2737    /// leaked increment here would skew the stranded-count
2738    /// accounting any later wait-for-zero reader relies on.
2739    #[tokio::test]
2740    async fn striped_in_flight_shutdown_gate_rejects_and_nets_zero() {
2741        let config = EventBusConfig::builder()
2742            .num_shards(2)
2743            .ring_buffer_capacity(1024)
2744            .build()
2745            .unwrap();
2746        let bus = EventBus::new(config).await.unwrap();
2747        bus.shutdown_via_ref().await.unwrap();
2748
2749        let err = bus.ingest(Event::new(json!({"late": true}))).unwrap_err();
2750        assert!(
2751            matches!(err, IngestionError::ShuttingDown),
2752            "post-shutdown ingest must fail closed, got {err:?}"
2753        );
2754        assert_eq!(
2755            bus.in_flight_ingests.sum(),
2756            0,
2757            "rejected gate entry must net-zero its stripe slot"
2758        );
2759    }
2760
2761    #[tokio::test]
2762    async fn test_event_bus_basic() {
2763        let config = EventBusConfig::builder()
2764            .num_shards(2)
2765            .ring_buffer_capacity(1024)
2766            .build()
2767            .unwrap();
2768
2769        let bus = EventBus::new(config).await.unwrap();
2770
2771        // Ingest some events
2772        for i in 0..10 {
2773            let event = Event::new(json!({"index": i}));
2774            bus.ingest(event).unwrap();
2775        }
2776
2777        // Give workers time to process
2778        tokio::time::sleep(Duration::from_millis(100)).await;
2779
2780        // Check stats
2781        assert_eq!(
2782            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2783            10
2784        );
2785
2786        bus.shutdown().await.unwrap();
2787    }
2788
2789    #[tokio::test]
2790    async fn test_event_bus_batch_ingest() {
2791        let config = EventBusConfig::default();
2792        let bus = EventBus::new(config).await.unwrap();
2793
2794        let events: Vec<Event> = (0..100).map(|i| Event::new(json!({"i": i}))).collect();
2795
2796        let ingested = bus.ingest_batch(events);
2797        assert_eq!(ingested, 100);
2798
2799        bus.shutdown().await.unwrap();
2800    }
2801
2802    #[tokio::test]
2803    async fn test_event_bus_with_dynamic_scaling() {
2804        let policy = ScalingPolicy {
2805            min_shards: 2,
2806            max_shards: 8,
2807            ..Default::default()
2808        };
2809
2810        let config = EventBusConfig::builder()
2811            .num_shards(2)
2812            .ring_buffer_capacity(1024)
2813            .scaling(policy)
2814            .build()
2815            .unwrap();
2816
2817        let bus = EventBus::new(config).await.unwrap();
2818
2819        // Verify dynamic scaling is enabled
2820        assert!(bus.is_dynamic_scaling_enabled());
2821        assert_eq!(bus.num_shards(), 2);
2822
2823        // Ingest some events
2824        for i in 0..100 {
2825            let event = Event::new(json!({"index": i}));
2826            bus.ingest(event).unwrap();
2827        }
2828
2829        // Give workers time to process
2830        tokio::time::sleep(Duration::from_millis(100)).await;
2831
2832        // Check stats
2833        assert_eq!(
2834            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2835            100
2836        );
2837
2838        bus.shutdown().await.unwrap();
2839    }
2840
2841    #[tokio::test]
2842    async fn test_manual_scale_up() {
2843        let policy = ScalingPolicy {
2844            min_shards: 2,
2845            max_shards: 8,
2846            cooldown: Duration::from_nanos(1), // Effectively disable cooldown for test
2847            ..Default::default()
2848        };
2849
2850        let config = EventBusConfig::builder()
2851            .num_shards(2)
2852            .ring_buffer_capacity(1024)
2853            .scaling(policy)
2854            .build()
2855            .unwrap();
2856
2857        let bus = EventBus::new(config).await.unwrap();
2858
2859        assert_eq!(bus.num_shards(), 2);
2860
2861        // Manually scale up
2862        let new_ids = bus.manual_scale_up(2).await.unwrap();
2863        assert_eq!(new_ids.len(), 2);
2864        assert_eq!(bus.num_shards(), 4);
2865
2866        // Ingest events - they should be distributed across all shards
2867        for i in 0..100 {
2868            let event = Event::new(json!({"index": i}));
2869            bus.ingest(event).unwrap();
2870        }
2871
2872        tokio::time::sleep(Duration::from_millis(100)).await;
2873
2874        assert_eq!(
2875            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2876            100
2877        );
2878
2879        bus.shutdown().await.unwrap();
2880    }
2881
2882    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #82: previously
2883    /// `manual_scale_down` only called `mapper.scale_down(count)`,
2884    /// which marks shards as `Draining` but does NOT finalize them.
2885    /// Bus configs without an active scaling monitor (or callers
2886    /// shutting down before the monitor's next tick) lost any
2887    /// events queued in the drained shards' ring buffers because
2888    /// `remove_shard_internal` was never invoked. The fix runs the
2889    /// full lifecycle synchronously: scale_down → poll for empty →
2890    /// finalize_draining → remove_shard_internal.
2891    ///
2892    /// We pin this by scaling up, manually scaling down, and
2893    /// asserting that `num_shards` actually decreases — pre-fix
2894    /// the count would still reflect the Draining shards.
2895    #[tokio::test]
2896    async fn manual_scale_down_finalizes_and_removes_drained_shards() {
2897        let policy = ScalingPolicy {
2898            min_shards: 2,
2899            max_shards: 8,
2900            cooldown: Duration::from_nanos(1),
2901            ..Default::default()
2902        };
2903        let config = EventBusConfig::builder()
2904            .num_shards(2)
2905            .ring_buffer_capacity(1024)
2906            .scaling(policy)
2907            .build()
2908            .unwrap();
2909        let bus = EventBus::new(config).await.unwrap();
2910
2911        // Scale up to 4, then back down to 2.
2912        let added = bus.manual_scale_up(2).await.unwrap();
2913        assert_eq!(added.len(), 2);
2914        assert_eq!(bus.num_shards(), 4);
2915
2916        let removed = bus.manual_scale_down(2).await.unwrap();
2917        assert_eq!(
2918            removed.len(),
2919            2,
2920            "manual_scale_down must complete the lifecycle for both \
2921             requested shards (mark Draining → wait → finalize → remove)"
2922        );
2923
2924        // Pre-fix: `num_shards` would still be 4 because shards
2925        // were only marked Draining (and the routing-table removal
2926        // path never ran). Post-fix: it's back to 2.
2927        assert_eq!(
2928            bus.num_shards(),
2929            2,
2930            "drained shards must be removed from the routing table"
2931        );
2932
2933        bus.shutdown().await.unwrap();
2934    }
2935
2936    #[tokio::test]
2937    async fn test_shard_metrics() {
2938        let policy = ScalingPolicy::default();
2939
2940        let config = EventBusConfig::builder()
2941            .num_shards(2)
2942            .ring_buffer_capacity(1024)
2943            .scaling(policy)
2944            .build()
2945            .unwrap();
2946
2947        let bus = EventBus::new(config).await.unwrap();
2948
2949        // Ingest some events
2950        for i in 0..50 {
2951            let event = Event::new(json!({"index": i}));
2952            bus.ingest(event).unwrap();
2953        }
2954
2955        // Get metrics
2956        let metrics = bus.shard_metrics();
2957        assert!(metrics.is_some());
2958        let metrics = metrics.unwrap();
2959        assert_eq!(metrics.len(), 2);
2960
2961        bus.shutdown().await.unwrap();
2962    }
2963
2964    #[tokio::test]
2965    async fn test_regression_eventbus_drop_signals_shutdown() {
2966        // Regression: dropping an EventBus without calling shutdown() used to
2967        // leave background tasks running indefinitely. The Drop impl now sets
2968        // the shutdown flag so workers eventually exit.
2969        let result = tokio::time::timeout(Duration::from_secs(5), async {
2970            let config = EventBusConfig::builder()
2971                .num_shards(2)
2972                .ring_buffer_capacity(1024)
2973                .build()
2974                .unwrap();
2975
2976            let bus = EventBus::new(config).await.unwrap();
2977
2978            // Ingest some events
2979            for i in 0..10 {
2980                let event = Event::new(json!({"index": i}));
2981                bus.ingest(event).unwrap();
2982            }
2983
2984            // Drop without calling shutdown()
2985            drop(bus);
2986
2987            // If we reach here, the drop didn't hang
2988        })
2989        .await;
2990
2991        assert!(
2992            result.is_ok(),
2993            "EventBus drop should not hang — Drop impl must signal shutdown"
2994        );
2995    }
2996
2997    #[tokio::test]
2998    async fn test_with_dynamic_scaling_builder() {
2999        let config = EventBusConfig::builder()
3000            .num_shards(4)
3001            .ring_buffer_capacity(2048)
3002            .with_dynamic_scaling()
3003            .build()
3004            .unwrap();
3005
3006        let bus = EventBus::new(config).await.unwrap();
3007
3008        assert!(bus.is_dynamic_scaling_enabled());
3009        assert_eq!(bus.num_shards(), 4);
3010
3011        bus.shutdown().await.unwrap();
3012    }
3013
3014    /// Mock adapter that counts `on_batch` invocations and returns a
3015    /// configurable error variant. Used to assert dispatch retry
3016    /// semantics without dragging in a real adapter.
3017    struct CountingErrAdapter {
3018        calls: Arc<std::sync::atomic::AtomicU32>,
3019        make_err: Box<dyn Fn() -> AdapterError + Send + Sync>,
3020    }
3021
3022    #[async_trait::async_trait]
3023    impl crate::adapter::Adapter for CountingErrAdapter {
3024        async fn init(&mut self) -> Result<(), AdapterError> {
3025            Ok(())
3026        }
3027        async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3028            self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3029            Err((self.make_err)())
3030        }
3031        async fn flush(&self) -> Result<(), AdapterError> {
3032            Ok(())
3033        }
3034        async fn shutdown(&self) -> Result<(), AdapterError> {
3035            Ok(())
3036        }
3037        async fn poll_shard(
3038            &self,
3039            _shard_id: u16,
3040            _from_id: Option<&str>,
3041            _limit: usize,
3042        ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3043            Ok(crate::adapter::ShardPollResult::empty())
3044        }
3045        fn name(&self) -> &'static str {
3046            "counting_err"
3047        }
3048        async fn is_healthy(&self) -> bool {
3049            true
3050        }
3051    }
3052
3053    /// Regression: BUG_REPORT.md #21 — `dispatch_batch` previously
3054    /// retried every error variant, ignoring `AdapterError::is_retryable`.
3055    /// A non-retryable error (Connection / Fatal / Serialization)
3056    /// should now drop the batch immediately rather than burn the
3057    /// retry budget on something that cannot succeed.
3058    #[tokio::test(start_paused = true)]
3059    async fn dispatch_batch_skips_retries_on_non_retryable_error() {
3060        let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
3061        let adapter = CountingErrAdapter {
3062            calls: calls.clone(),
3063            make_err: Box::new(|| AdapterError::Connection("refused".into())),
3064        };
3065
3066        let batch = Batch::new(0, vec![], 0);
3067        let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 5).await;
3068
3069        assert!(!ok, "non-retryable error must drop batch");
3070        assert_eq!(
3071            calls.load(std::sync::atomic::Ordering::SeqCst),
3072            1,
3073            "Connection error must not be retried; expected exactly 1 on_batch call"
3074        );
3075    }
3076
3077    /// Sanity: a retryable error *does* go through the full retry
3078    /// budget. Without this companion check, the previous test could
3079    /// pass for the wrong reason (e.g. if dispatch always returned on
3080    /// the first error).
3081    #[tokio::test(start_paused = true)]
3082    async fn dispatch_batch_retries_transient_errors() {
3083        let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
3084        let adapter = CountingErrAdapter {
3085            calls: calls.clone(),
3086            make_err: Box::new(|| AdapterError::Transient("temp".into())),
3087        };
3088
3089        let batch = Batch::new(0, vec![], 0);
3090        let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 3).await;
3091
3092        assert!(!ok);
3093        // 3 retries + 1 final attempt = 4 total calls.
3094        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 4);
3095    }
3096
3097    /// Pin the perf contract behind the `Arc<Batch>` change.
3098    ///
3099    /// Pre-fix `dispatch_batch` deep-cloned the events `Vec` on every
3100    /// retry attempt (and the comment in production code already
3101    /// admitted this was wasted on the common `retries == 0` path).
3102    /// Post-fix the dispatcher takes `Arc<Batch>` and each retry is a
3103    /// refcount bump.
3104    ///
3105    /// We pin the contract observationally: an adapter that fails 3
3106    /// times then succeeds receives the *same* `Arc<Batch>` pointer
3107    /// on every call. A regression that ever rebuilt the Batch
3108    /// (e.g. a future refactor that does `Arc::new(batch.as_ref().clone())`)
3109    /// would surface here as distinct `Arc::as_ptr` values.
3110    #[tokio::test(start_paused = true)]
3111    async fn dispatch_batch_retries_share_the_same_arc_allocation() {
3112        // Identity-record `Arc::as_ptr(&batch)` per call via the
3113        // batch's strong-count snapshot. We cast to `usize` so the
3114        // adapter can stay `Send + Sync` without an unsafe impl —
3115        // the value is just an opaque identity stamp.
3116        struct PointerRecordingAdapter {
3117            seen: Arc<parking_lot::Mutex<Vec<usize>>>,
3118            fail_first_n: u32,
3119            calls: Arc<std::sync::atomic::AtomicU32>,
3120        }
3121
3122        #[async_trait::async_trait]
3123        impl crate::adapter::Adapter for PointerRecordingAdapter {
3124            async fn init(&mut self) -> Result<(), AdapterError> {
3125                Ok(())
3126            }
3127            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3128                self.seen.lock().push(Arc::as_ptr(&batch) as usize);
3129                let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3130                if n < self.fail_first_n {
3131                    Err(AdapterError::Transient("retry me".into()))
3132                } else {
3133                    Ok(())
3134                }
3135            }
3136            async fn flush(&self) -> Result<(), AdapterError> {
3137                Ok(())
3138            }
3139            async fn shutdown(&self) -> Result<(), AdapterError> {
3140                Ok(())
3141            }
3142            async fn poll_shard(
3143                &self,
3144                _shard_id: u16,
3145                _from_id: Option<&str>,
3146                _limit: usize,
3147            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3148                Ok(crate::adapter::ShardPollResult::empty())
3149            }
3150            fn name(&self) -> &'static str {
3151                "pointer-recording"
3152            }
3153        }
3154
3155        let seen = Arc::new(parking_lot::Mutex::new(Vec::new()));
3156        let adapter = PointerRecordingAdapter {
3157            seen: Arc::clone(&seen),
3158            fail_first_n: 3,
3159            calls: Arc::new(std::sync::atomic::AtomicU32::new(0)),
3160        };
3161        let batch = Arc::new(Batch::new(0, vec![], 0));
3162        let original_ptr = Arc::as_ptr(&batch) as usize;
3163
3164        let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 5).await;
3165        assert!(ok, "adapter accepts on 4th try; dispatch must succeed");
3166
3167        let pointers = seen.lock().clone();
3168        assert_eq!(
3169            pointers.len(),
3170            4,
3171            "expected 3 failures + 1 success = 4 on_batch calls",
3172        );
3173        // Every call observed the SAME Arc target. A regression that
3174        // deep-clones would have produced four distinct pointers.
3175        for (i, p) in pointers.iter().enumerate() {
3176            assert_eq!(
3177                *p, original_ptr,
3178                "attempt {i} saw a different Arc<Batch>; dispatch_batch deep-cloned",
3179            );
3180        }
3181    }
3182
3183    /// Counting adapter that records the number of events delivered via
3184    /// `on_batch`. Used by shutdown-durability tests below.
3185    struct CountingAdapter {
3186        received: Arc<AtomicU64>,
3187    }
3188
3189    #[async_trait::async_trait]
3190    impl crate::adapter::Adapter for CountingAdapter {
3191        async fn init(&mut self) -> Result<(), AdapterError> {
3192            Ok(())
3193        }
3194        async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3195            self.received
3196                .fetch_add(batch.events.len() as u64, AtomicOrdering::SeqCst);
3197            Ok(())
3198        }
3199        async fn flush(&self) -> Result<(), AdapterError> {
3200            Ok(())
3201        }
3202        async fn shutdown(&self) -> Result<(), AdapterError> {
3203            Ok(())
3204        }
3205        async fn poll_shard(
3206            &self,
3207            _shard_id: u16,
3208            _from_id: Option<&str>,
3209            _limit: usize,
3210        ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3211            Ok(crate::adapter::ShardPollResult::empty())
3212        }
3213        fn name(&self) -> &'static str {
3214            "counting"
3215        }
3216        async fn is_healthy(&self) -> bool {
3217            true
3218        }
3219    }
3220
3221    /// `retry_backoff` exponentially grows the base delay
3222    /// per attempt and adds per-(shard, attempt) jitter to
3223    /// decorrelate retries across shards. Pin both invariants:
3224    /// monotonic growth on the base, and jitter that produces
3225    /// different outputs for different shard ids.
3226    #[test]
3227    fn retry_backoff_grows_with_attempt_and_jitters_per_shard() {
3228        // Shard 0 attempt 0..6: base ms 100, 200, 400, 800, 1600,
3229        // 3200, 3200 (cap). Plus ±25% jitter.
3230        let s0_a0 = retry_backoff(0, 0).as_millis();
3231        let s0_a1 = retry_backoff(0, 1).as_millis();
3232        let s0_a4 = retry_backoff(0, 4).as_millis();
3233        let s0_a5 = retry_backoff(0, 5).as_millis();
3234        let s0_a6 = retry_backoff(0, 6).as_millis();
3235
3236        // Bounds: each attempt's base is in `[base*0.75, base*1.25)`.
3237        assert!((75..=125).contains(&s0_a0));
3238        assert!((150..=250).contains(&s0_a1));
3239        assert!((1200..=2000).contains(&s0_a4));
3240        assert!((2400..=4000).contains(&s0_a5));
3241        // Cap at attempt=5: attempt 6 must NOT exceed attempt 5's
3242        // upper bound.
3243        assert!(
3244            s0_a6 <= 4000,
3245            "attempt > 5 must cap at the attempt-5 base; got {}ms",
3246            s0_a6
3247        );
3248
3249        // Jitter property: different shards at the same
3250        // attempt land on different backoffs. Sample 16 distinct
3251        // shard ids and assert at least 4 unique backoff values.
3252        //
3253        // The bound is deliberately loose (4 / 16) because
3254        // `DefaultHasher`'s exact distribution is **not stable**
3255        // across Rust toolchain versions — a tighter check (e.g.
3256        // ≥ 8) would empirically pass on every toolchain we test
3257        // against today, but a future stdlib change to the hasher
3258        // could shift the distribution and flake CI for a property
3259        // (decorrelation across shards) that doesn't actually
3260        // depend on a high collision-resistance bar. Asserting
3261        // ≥ 4 unique values out of 16 is enough to catch a real
3262        // regression (e.g. accidentally hashing only `attempt`
3263        // and not `shard_id` would collapse all 16 to a single
3264        // value) while staying robust to hasher-distribution
3265        // drift.
3266        use std::collections::HashSet;
3267        let s_attempt2: HashSet<u128> = (0u16..16)
3268            .map(|s| retry_backoff(s, 2).as_millis())
3269            .collect();
3270        assert!(
3271            s_attempt2.len() >= 4,
3272            "jitter must decorrelate retries across shards; \
3273             only {} unique backoffs across 16 shards",
3274            s_attempt2.len()
3275        );
3276    }
3277
3278    /// CR-23: pin that `EventBus::shutdown` actually invokes the
3279    /// adapter's `flush()` and `shutdown()` methods. The existing
3280    /// `sdk/tests/shutdown_regression.rs` covers the
3281    /// "shutdown runs even with outstanding Arc clones" property
3282    /// using a memory adapter whose `flush`/`shutdown` are no-ops
3283    /// — so a regression that elided the adapter calls would still
3284    /// pass. This test uses a recording adapter that increments
3285    /// per-method counters; we assert flush AND shutdown both fired
3286    /// exactly once after a clean `bus.shutdown().await`.
3287    ///
3288    /// The fix routes `Net::shutdown` through
3289    /// `shutdown_via_ref`, which in turn calls
3290    /// `self.adapter.flush()` and `self.adapter.shutdown()` once
3291    /// each. CR-23 pins this contract at the bus layer so an
3292    /// inadvertent regression at the SDK or adapter wrapper layer
3293    /// can be caught without an integration setup.
3294    #[tokio::test]
3295    async fn cr23_shutdown_invokes_adapter_flush_and_shutdown_exactly_once() {
3296        struct RecordingAdapter {
3297            on_batch_calls: Arc<AtomicU64>,
3298            flush_calls: Arc<AtomicU64>,
3299            shutdown_calls: Arc<AtomicU64>,
3300        }
3301
3302        #[async_trait::async_trait]
3303        impl crate::adapter::Adapter for RecordingAdapter {
3304            async fn init(&mut self) -> Result<(), AdapterError> {
3305                Ok(())
3306            }
3307            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3308                self.on_batch_calls.fetch_add(1, AtomicOrdering::SeqCst);
3309                Ok(())
3310            }
3311            async fn flush(&self) -> Result<(), AdapterError> {
3312                self.flush_calls.fetch_add(1, AtomicOrdering::SeqCst);
3313                Ok(())
3314            }
3315            async fn shutdown(&self) -> Result<(), AdapterError> {
3316                self.shutdown_calls.fetch_add(1, AtomicOrdering::SeqCst);
3317                Ok(())
3318            }
3319            async fn poll_shard(
3320                &self,
3321                _shard_id: u16,
3322                _from_id: Option<&str>,
3323                _limit: usize,
3324            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3325                Ok(crate::adapter::ShardPollResult::empty())
3326            }
3327            fn name(&self) -> &'static str {
3328                "cr23-recording"
3329            }
3330            async fn is_healthy(&self) -> bool {
3331                true
3332            }
3333        }
3334
3335        let on_batch = Arc::new(AtomicU64::new(0));
3336        let flush = Arc::new(AtomicU64::new(0));
3337        let shutdown = Arc::new(AtomicU64::new(0));
3338        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(RecordingAdapter {
3339            on_batch_calls: on_batch.clone(),
3340            flush_calls: flush.clone(),
3341            shutdown_calls: shutdown.clone(),
3342        });
3343
3344        let config = EventBusConfig::builder()
3345            .num_shards(2)
3346            .ring_buffer_capacity(1024)
3347            .build()
3348            .unwrap();
3349        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3350
3351        // Drive a small burst so on_batch fires at least once —
3352        // pins that the adapter is wired up correctly. The
3353        // load-bearing assertions below are on flush and shutdown.
3354        for i in 0..16 {
3355            let _ = bus.ingest(Event::new(json!({"i": i})));
3356        }
3357
3358        // Pre-CR-23 a regression that elided one of these would
3359        // pass `shutdown_regression.rs::shutdown_runs_even_with_outstanding_event_stream`
3360        // because the memory adapter's flush/shutdown are no-ops.
3361        // Here the recording adapter makes the contract observable.
3362        bus.shutdown().await.unwrap();
3363
3364        assert!(
3365            on_batch.load(AtomicOrdering::SeqCst) > 0,
3366            "sanity: on_batch must have fired at least once"
3367        );
3368        assert_eq!(
3369            flush.load(AtomicOrdering::SeqCst),
3370            1,
3371            "CR-23 regression: shutdown MUST call adapter.flush() exactly once"
3372        );
3373        assert_eq!(
3374            shutdown.load(AtomicOrdering::SeqCst),
3375            1,
3376            "CR-23 regression: shutdown MUST call adapter.shutdown() exactly once"
3377        );
3378    }
3379
3380    /// CR-25: pin that a SECOND caller of `shutdown_via_ref` whose
3381    /// CAS loses (because a first caller already flipped the
3382    /// `shutdown` flag) and whose deadline elapses BEFORE the
3383    /// first caller sets `shutdown_completed=true` returns
3384    /// `AdapterError::Transient(_)` — NOT a silent `Ok(())`.
3385    ///
3386    /// Pre-CR-25 both branches returned `Ok`. A caller that lost
3387    /// the CAS race had no way to distinguish "first caller
3388    /// finished shutdown" from "deadline timed out mid-shutdown."
3389    /// Under a slow adapter (`adapter_timeout` default 30s >
3390    /// the 10s spin deadline), the second caller silently saw
3391    /// `Ok` while the bus was still mid-shutdown — letting
3392    /// subsequent code observe a partially-shut-down bus.
3393    ///
3394    /// We use a slow first caller (sleeps inside `flush()`)
3395    /// and override the spin deadline to a few ms so the test
3396    /// runs fast.
3397    #[tokio::test]
3398    async fn cr25_second_caller_returns_transient_when_deadline_elapses() {
3399        struct SlowFlushAdapter {
3400            // Block flush() for this long. The first caller
3401            // gets stuck here while the second caller's spin
3402            // deadline elapses.
3403            flush_delay: std::time::Duration,
3404        }
3405
3406        #[async_trait::async_trait]
3407        impl crate::adapter::Adapter for SlowFlushAdapter {
3408            async fn init(&mut self) -> Result<(), AdapterError> {
3409                Ok(())
3410            }
3411            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3412                Ok(())
3413            }
3414            async fn flush(&self) -> Result<(), AdapterError> {
3415                tokio::time::sleep(self.flush_delay).await;
3416                Ok(())
3417            }
3418            async fn shutdown(&self) -> Result<(), AdapterError> {
3419                Ok(())
3420            }
3421            async fn poll_shard(
3422                &self,
3423                _shard_id: u16,
3424                _from_id: Option<&str>,
3425                _limit: usize,
3426            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3427                Ok(crate::adapter::ShardPollResult::empty())
3428            }
3429            fn name(&self) -> &'static str {
3430                "cr25-slow-flush"
3431            }
3432            async fn is_healthy(&self) -> bool {
3433                true
3434            }
3435        }
3436
3437        // Cubic P2: serialize access to the global deadline
3438        // override so concurrent tests don't interfere. Hold the
3439        // guard until the override is reset to 0 below.
3440        let _override_guard = super::shutdown_deadline_override_lock().await;
3441
3442        // Override the second-caller spin deadline to a short
3443        // value so the test doesn't wall-clock-wait 10s. Production
3444        // builds use the 10s default (the cfg(test) override is
3445        // compiled out).
3446        super::set_shutdown_via_ref_spin_deadline_for_test(50);
3447
3448        let config = EventBusConfig::builder()
3449            .num_shards(2)
3450            .ring_buffer_capacity(1024)
3451            .adapter_timeout(std::time::Duration::from_secs(5))
3452            .build()
3453            .unwrap();
3454        // First caller's flush() sleeps 500ms — far longer than
3455        // the 50ms spin deadline.
3456        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(SlowFlushAdapter {
3457            flush_delay: std::time::Duration::from_millis(500),
3458        });
3459        let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
3460
3461        // Spawn the FIRST caller — it wins the CAS and proceeds
3462        // into the slow flush. We don't await it; we want it
3463        // running in parallel.
3464        let bus_first = Arc::clone(&bus);
3465        let first_caller = tokio::spawn(async move { bus_first.shutdown_via_ref().await });
3466
3467        // Cubic P2: poll `is_shutdown()` until the first caller
3468        // has set the flag, instead of a fixed sleep. This makes
3469        // the test scheduler-independent — we proceed as soon as
3470        // the first caller has won the CAS, regardless of how
3471        // tokio happens to schedule things. Bounded by a 1s
3472        // timeout so a regression that prevents `shutdown_via_ref`
3473        // from setting the flag doesn't hang the test.
3474        tokio::time::timeout(std::time::Duration::from_secs(1), async {
3475            while !bus.is_shutdown() {
3476                tokio::task::yield_now().await;
3477            }
3478        })
3479        .await
3480        .expect("first caller did not set shutdown flag within 1s");
3481
3482        // The second caller's CAS will fail; it enters the spin
3483        // and times out at 50ms.
3484        let start = std::time::Instant::now();
3485        let second_result = bus.shutdown_via_ref().await;
3486        let elapsed = start.elapsed();
3487
3488        // Reset the override for other tests, then drop the guard.
3489        super::set_shutdown_via_ref_spin_deadline_for_test(0);
3490
3491        // CR-25 contract: the second caller MUST get a Transient
3492        // error, not a silent Ok.
3493        let err = second_result.expect_err(
3494            "CR-25 regression: second caller MUST surface AdapterError::Transient \
3495             when its deadline elapses, NOT a silent Ok",
3496        );
3497        match err {
3498            AdapterError::Transient(msg) => {
3499                assert!(
3500                    msg.contains("deadline elapsed") || msg.contains("mid-shutdown"),
3501                    "error message must reference the deadline path; got: {msg}"
3502                );
3503            }
3504            other => panic!("expected Transient, got {:?}", other),
3505        }
3506
3507        // Sanity: the second caller's elapsed time is bounded by
3508        // the override (50ms) + scheduler slop. If this is
3509        // anywhere near the production 10s, the cfg(test)
3510        // override path broke.
3511        assert!(
3512            elapsed < std::time::Duration::from_secs(5),
3513            "second caller took {elapsed:?}; the cfg(test) deadline override \
3514             broke if this is near the 10s production default"
3515        );
3516
3517        // Wait for the first caller to finish. Even though it
3518        // took the slow path, the bus IS shutting down and will
3519        // eventually complete.
3520        let _ = first_caller.await.unwrap();
3521        assert!(bus.is_shutdown_completed());
3522    }
3523
3524    /// Regression: BUG_REPORT.md #6 — `shutdown()` must deliver every
3525    /// successfully-ingested event to the adapter before returning.
3526    /// Pins the broader durability contract that the
3527    /// `drain_finalize_ready` gate supports: the drain worker may not
3528    /// finalize until the in-flight wait completes.
3529    ///
3530    /// Tests across many shards with bursts large enough that the
3531    /// drain workers are mid-loop when shutdown begins.
3532    #[tokio::test]
3533    async fn shutdown_delivers_every_successful_ingest_to_adapter() {
3534        let received = Arc::new(AtomicU64::new(0));
3535        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3536            received: received.clone(),
3537        });
3538
3539        let config = EventBusConfig::builder()
3540            .num_shards(4)
3541            .ring_buffer_capacity(4096)
3542            .build()
3543            .unwrap();
3544        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3545
3546        // Drive a sizable burst across all shards. Capacity > burst so
3547        // we don't trip backpressure; every successful Ok must reach
3548        // `on_batch` before shutdown returns.
3549        let total = 10_000usize;
3550        let mut successes = 0u64;
3551        for i in 0..total {
3552            if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3553                successes += 1;
3554            }
3555        }
3556
3557        // Shutdown awaits drain workers; with the BUG_REPORT.md #6 fix
3558        // those workers wait on `drain_finalize_ready` after observing
3559        // `shutdown=true`, so any push the producer made before the
3560        // shutdown flag is guaranteed to be in the ring buffer when
3561        // the final sweep runs.
3562        bus.shutdown().await.unwrap();
3563
3564        let delivered = received.load(AtomicOrdering::SeqCst);
3565        assert_eq!(
3566            delivered, successes,
3567            "shutdown stranded events: {successes} ingested successfully, \
3568             only {delivered} reached the adapter"
3569        );
3570    }
3571
3572    /// Regression: BUG_REPORT.md #16 — `flush()` must be a delivery
3573    /// barrier: after it returns successfully, every event the
3574    /// caller successfully ingested before `flush()` was called
3575    /// must have been handed to the adapter via `on_batch`.
3576    /// The previous implementation slept a single `batch.max_delay`
3577    /// after the ring buffers drained, which left a window where
3578    /// events could still be sitting in the per-shard mpsc channel
3579    /// or inside a partially-filled batch awaiting timeout — those
3580    /// events were silently dropped from the flush guarantee.
3581    #[tokio::test]
3582    async fn flush_is_a_delivery_barrier() {
3583        let received = Arc::new(AtomicU64::new(0));
3584        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3585            received: received.clone(),
3586        });
3587
3588        // Use a deliberately *long* batch.max_delay (250ms) so that a
3589        // partially-filled batch sitting in the batch worker's
3590        // pending state would survive past the old single-`max_delay`
3591        // sleep. min_size > burst forces the partial-batch path.
3592        let config = EventBusConfig::builder()
3593            .num_shards(2)
3594            .ring_buffer_capacity(1024)
3595            .batch(crate::config::BatchConfig {
3596                min_size: 1_000,
3597                max_size: 10_000,
3598                max_delay: Duration::from_millis(250),
3599                adaptive: false,
3600                velocity_window: Duration::from_millis(100),
3601            })
3602            .build()
3603            .unwrap();
3604        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3605
3606        // A small burst — far below `min_size`, so the batch worker
3607        // will sit on a partial batch waiting for `max_delay`.
3608        let burst = 50usize;
3609        let mut successes = 0u64;
3610        for i in 0..burst {
3611            if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3612                successes += 1;
3613            }
3614        }
3615
3616        // Time the flush call to confirm we waited long enough for
3617        // the partial batch to time out. The previous code slept
3618        // ~10ms total in the post-empty phase; the fix waits up to
3619        // `max_delay * num_workers` (here 500ms cap, capped at 2s).
3620        let t0 = std::time::Instant::now();
3621        bus.flush().await.unwrap();
3622        let elapsed = t0.elapsed();
3623
3624        // After flush returns, every successful ingest must have
3625        // been delivered to the adapter. With the old code this
3626        // assertion would fail: events sit in the partial batch
3627        // until `max_delay` (250ms) elapses, but flush returned
3628        // after only ~10ms.
3629        let delivered = received.load(AtomicOrdering::SeqCst);
3630        assert_eq!(
3631            delivered, successes,
3632            "flush() returned but only {delivered} of {successes} \
3633             events reached the adapter (#16); flush waited {:?}",
3634            elapsed
3635        );
3636
3637        bus.shutdown().await.unwrap();
3638    }
3639
3640    /// Regression (Phase 1): when configured with a
3641    /// persistent `producer_nonce_path`, two bus instances launched
3642    /// against the same path stamp the SAME nonce on every emitted
3643    /// batch. JetStream / Redis adapters key dedup on this nonce, so
3644    /// a producer that crashed mid-batch and restarted (within the
3645    /// backend's dedup window) issues retries with the same msg-ids
3646    /// and the backend correctly recognizes them as duplicates.
3647    ///
3648    /// Pre-fix the per-process nonce regenerated on every startup,
3649    /// so post-crash retries wrote NEW msg-ids and the backend
3650    /// persisted the partial-batch's accepted half twice.
3651    #[tokio::test]
3652    async fn persistent_producer_nonce_survives_bus_restart() {
3653        // Use a per-test temp file so concurrent runs don't collide.
3654        let mut nonce_path = std::env::temp_dir();
3655        let pid = std::process::id();
3656        let nanos = std::time::SystemTime::now()
3657            .duration_since(std::time::UNIX_EPOCH)
3658            .map(|d| d.as_nanos())
3659            .unwrap_or(0);
3660        nonce_path.push(format!("net-test-bus-nonce-{pid}-{nanos}"));
3661
3662        let make_config = |path: &std::path::Path| {
3663            EventBusConfig::builder()
3664                .num_shards(1)
3665                .ring_buffer_capacity(1024)
3666                .producer_nonce_path(path)
3667                .build()
3668                .unwrap()
3669        };
3670
3671        // First bus: ingest one event. Read its nonce off the
3672        // adapter-bound batch via a recording adapter.
3673        struct NonceRecordingAdapter {
3674            nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3675        }
3676        #[async_trait::async_trait]
3677        impl crate::adapter::Adapter for NonceRecordingAdapter {
3678            async fn init(&mut self) -> Result<(), AdapterError> {
3679                Ok(())
3680            }
3681            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3682                *self.nonce.lock() = Some(batch.process_nonce);
3683                Ok(())
3684            }
3685            async fn flush(&self) -> Result<(), AdapterError> {
3686                Ok(())
3687            }
3688            async fn shutdown(&self) -> Result<(), AdapterError> {
3689                Ok(())
3690            }
3691            async fn poll_shard(
3692                &self,
3693                _: u16,
3694                _: Option<&str>,
3695                _: usize,
3696            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3697                Ok(crate::adapter::ShardPollResult::empty())
3698            }
3699            fn name(&self) -> &'static str {
3700                "nonce-recording"
3701            }
3702        }
3703
3704        let nonce_first_run = Arc::new(parking_lot::Mutex::new(None));
3705        {
3706            let bus = EventBus::new_with_adapter(
3707                make_config(&nonce_path),
3708                Box::new(NonceRecordingAdapter {
3709                    nonce: nonce_first_run.clone(),
3710                }),
3711            )
3712            .await
3713            .unwrap();
3714            bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3715            bus.flush().await.unwrap();
3716            bus.shutdown().await.unwrap();
3717        }
3718
3719        let nonce_second_run = Arc::new(parking_lot::Mutex::new(None));
3720        {
3721            let bus = EventBus::new_with_adapter(
3722                make_config(&nonce_path),
3723                Box::new(NonceRecordingAdapter {
3724                    nonce: nonce_second_run.clone(),
3725                }),
3726            )
3727            .await
3728            .unwrap();
3729            bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3730            bus.flush().await.unwrap();
3731            bus.shutdown().await.unwrap();
3732        }
3733
3734        let n_a = nonce_first_run
3735            .lock()
3736            .expect("first bus must have dispatched a batch");
3737        let n_b = nonce_second_run
3738            .lock()
3739            .expect("second bus must have dispatched a batch");
3740        assert_eq!(
3741            n_a, n_b,
3742            "two bus instances against the same producer_nonce_path \
3743             must stamp the same nonce — pre-fix this regenerated on \
3744             every restart and JetStream's dedup window saw new \
3745             msg-ids as fresh batches",
3746        );
3747
3748        // Cleanup.
3749        let _ = std::fs::remove_file(&nonce_path);
3750    }
3751
3752    /// Pin that ALL spawn sites — both the static initial-shard
3753    /// loop in `new_with_adapter` and the dynamic-add path in
3754    /// `add_shard_internal` — clone the bus's loaded
3755    /// `producer_nonce` correctly. Pre-#56 there was no nonce
3756    /// concept at the bus layer; if any future refactor drops the
3757    /// `producer_nonce: self.producer_nonce` line from one of the
3758    /// spawn sites (or stops loading the persistent path), the
3759    /// post-scale-up shard's batches would carry a different nonce
3760    /// and JetStream's cross-restart dedup would silently break for
3761    /// events ingested into the dynamic shard. Pin all observed
3762    /// batches across the static + dynamic shards share the bus's
3763    /// nonce.
3764    #[tokio::test]
3765    async fn multi_shard_bus_stamps_consistent_nonce_across_static_and_dynamic_shards() {
3766        let mut nonce_path = std::env::temp_dir();
3767        let pid = std::process::id();
3768        let nanos = std::time::SystemTime::now()
3769            .duration_since(std::time::UNIX_EPOCH)
3770            .map(|d| d.as_nanos())
3771            .unwrap_or(0);
3772        nonce_path.push(format!("net-test-multi-shard-nonce-{pid}-{nanos}"));
3773
3774        struct CollectingAdapter {
3775            nonces: Arc<parking_lot::Mutex<Vec<u64>>>,
3776        }
3777        #[async_trait::async_trait]
3778        impl crate::adapter::Adapter for CollectingAdapter {
3779            async fn init(&mut self) -> Result<(), AdapterError> {
3780                Ok(())
3781            }
3782            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3783                self.nonces.lock().push(batch.process_nonce);
3784                Ok(())
3785            }
3786            async fn flush(&self) -> Result<(), AdapterError> {
3787                Ok(())
3788            }
3789            async fn shutdown(&self) -> Result<(), AdapterError> {
3790                Ok(())
3791            }
3792            async fn poll_shard(
3793                &self,
3794                _: u16,
3795                _: Option<&str>,
3796                _: usize,
3797            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3798                Ok(crate::adapter::ShardPollResult::empty())
3799            }
3800            fn name(&self) -> &'static str {
3801                "collecting"
3802            }
3803        }
3804
3805        let nonces = Arc::new(parking_lot::Mutex::new(Vec::new()));
3806        let policy = ScalingPolicy {
3807            min_shards: 1,
3808            max_shards: 8,
3809            cooldown: Duration::from_nanos(1),
3810            ..Default::default()
3811        };
3812        let config = EventBusConfig::builder()
3813            .num_shards(2)
3814            .ring_buffer_capacity(1024)
3815            .scaling(policy)
3816            .producer_nonce_path(&nonce_path)
3817            .build()
3818            .unwrap();
3819
3820        let bus = EventBus::new_with_adapter(
3821            config,
3822            Box::new(CollectingAdapter {
3823                nonces: nonces.clone(),
3824            }),
3825        )
3826        .await
3827        .unwrap();
3828
3829        // Drive the two static shards.
3830        for i in 0..200u64 {
3831            let _ = bus.ingest(Event::new(json!({"i": i})));
3832        }
3833        bus.flush().await.unwrap();
3834
3835        // Add a dynamic shard and drive it too.
3836        let _ = bus.manual_scale_up(1).await.unwrap();
3837        for i in 200..400u64 {
3838            let _ = bus.ingest(Event::new(json!({"i": i})));
3839        }
3840        bus.flush().await.unwrap();
3841
3842        bus.shutdown().await.unwrap();
3843
3844        let observed = nonces.lock().clone();
3845        assert!(
3846            !observed.is_empty(),
3847            "expected the adapter to have observed at least one batch",
3848        );
3849        let first = observed[0];
3850        for (i, &n) in observed.iter().enumerate() {
3851            assert_eq!(
3852                n, first,
3853                "batch {i} stamped a different nonce ({n:#x}) than the first \
3854                 batch ({first:#x}) — at least one spawn site (initial-shard \
3855                 loop or `add_shard_internal`) failed to inherit the bus's \
3856                 producer_nonce",
3857            );
3858        }
3859
3860        let _ = std::fs::remove_file(&nonce_path);
3861    }
3862
3863    /// Regression guard for the `(process_nonce, shard_id,
3864    /// sequence_start, i)` JetStream / Redis msg-id construction.
3865    /// Within one bus instance, `sequence_start` per shard MUST be
3866    /// strictly monotonic across batches AND every two adjacent
3867    /// batches `n` and `n+1` from the same shard MUST satisfy
3868    /// `seq_start[n+1] == seq_start[n] + len(events[n])` — no gaps,
3869    /// no overlap. A regression here breaks the at-most-once dedup
3870    /// every persistent adapter relies on: gaps reuse a `(shard,
3871    /// seq, i)` tuple after the dedup window closes, and overlap
3872    /// silently overlays a later batch on an earlier one's slot.
3873    ///
3874    /// The cross-restart variant (persistent `next_sequence` across
3875    /// process boots) is feature-shaped, not a bug fix — without
3876    /// persistence, restart relies on `process_nonce` rotating to
3877    /// disjoin the msg-id namespace (pinned by
3878    /// `persistent_producer_nonce_survives_bus_restart`). This test
3879    /// pins the within-process invariant the persistent nonce
3880    /// builds on.
3881    #[tokio::test]
3882    async fn sequence_start_is_per_shard_monotonic_and_gap_free() {
3883        struct ShardSeqRecorder {
3884            batches: Arc<parking_lot::Mutex<Vec<(u16, u64, usize)>>>,
3885        }
3886        #[async_trait::async_trait]
3887        impl crate::adapter::Adapter for ShardSeqRecorder {
3888            async fn init(&mut self) -> Result<(), AdapterError> {
3889                Ok(())
3890            }
3891            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3892                self.batches.lock().push((
3893                    batch.shard_id,
3894                    batch.sequence_start,
3895                    batch.events.len(),
3896                ));
3897                Ok(())
3898            }
3899            async fn flush(&self) -> Result<(), AdapterError> {
3900                Ok(())
3901            }
3902            async fn shutdown(&self) -> Result<(), AdapterError> {
3903                Ok(())
3904            }
3905            async fn poll_shard(
3906                &self,
3907                _: u16,
3908                _: Option<&str>,
3909                _: usize,
3910            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3911                Ok(crate::adapter::ShardPollResult::empty())
3912            }
3913            fn name(&self) -> &'static str {
3914                "shard-seq-recorder"
3915            }
3916        }
3917
3918        let batches = Arc::new(parking_lot::Mutex::new(Vec::new()));
3919        let config = EventBusConfig::builder()
3920            .num_shards(4)
3921            .ring_buffer_capacity(1024)
3922            .build()
3923            .unwrap();
3924        let bus = EventBus::new_with_adapter(
3925            config,
3926            Box::new(ShardSeqRecorder {
3927                batches: batches.clone(),
3928            }),
3929        )
3930        .await
3931        .unwrap();
3932
3933        // Three drive-then-flush rounds so each shard sees multiple
3934        // batches; the across-batch monotonicity invariant is what
3935        // we're pinning, so a single batch per shard wouldn't
3936        // exercise it.
3937        for round in 0..3u64 {
3938            for i in 0..200u64 {
3939                bus.ingest(Event::new(json!({"r": round, "i": i}))).unwrap();
3940            }
3941            bus.flush().await.unwrap();
3942        }
3943        bus.shutdown().await.unwrap();
3944
3945        let observed = batches.lock().clone();
3946        assert!(
3947            !observed.is_empty(),
3948            "expected the adapter to have observed at least one batch",
3949        );
3950
3951        // Bucket batches per shard, preserving dispatch order.
3952        let mut by_shard: std::collections::HashMap<u16, Vec<(u64, usize)>> =
3953            std::collections::HashMap::new();
3954        for (shard, seq_start, len) in observed {
3955            by_shard.entry(shard).or_default().push((seq_start, len));
3956        }
3957
3958        for (shard, runs) in &by_shard {
3959            assert!(
3960                !runs.is_empty(),
3961                "shard {shard}: must have at least one batch",
3962            );
3963            // First batch of every shard starts at the per-shard
3964            // zero — `BatchWorker::next_sequence` is initialized to
3965            // 0 and the first `flush()` reads it before the
3966            // `saturating_add`. A regression that lazily seeds
3967            // `next_sequence` from a non-zero source (e.g. wall
3968            // clock) would trip here.
3969            assert_eq!(
3970                runs[0].0, 0,
3971                "shard {shard}: first batch must start at sequence 0, got {}",
3972                runs[0].0,
3973            );
3974            for window in runs.windows(2) {
3975                let (prev_start, prev_len) = window[0];
3976                let (next_start, _next_len) = window[1];
3977                let expected_next = prev_start
3978                    .checked_add(prev_len as u64)
3979                    .expect("test bounds keep us well below u64::MAX");
3980                assert!(
3981                    next_start > prev_start,
3982                    "shard {shard}: sequence_start must be strictly monotonic; \
3983                     saw {prev_start} → {next_start}",
3984                );
3985                assert_eq!(
3986                    next_start, expected_next,
3987                    "shard {shard}: gap or overlap in sequence_start; \
3988                     prev=({prev_start}, len={prev_len}) → next={next_start}, \
3989                     expected {expected_next}. A gap reuses (shard, seq, i) \
3990                     tuples after the JetStream/Redis dedup window closes; an \
3991                     overlap silently overlays a later batch on an earlier \
3992                     one's slot.",
3993                );
3994            }
3995        }
3996    }
3997
3998    /// Pin the within-process caching contract for the fallback
3999    /// (no-`producer_nonce_path`) path: two bus instances created
4000    /// in the same process see the SAME `batch_process_nonce()`
4001    /// because the helper is `OnceLock`-cached. The
4002    /// "different-across-restarts" semantic is a *process-level*
4003    /// guarantee — restart the process to get a fresh nonce — and
4004    /// is pinned by `persistent_producer_nonce_survives_bus_restart`
4005    /// (which uses a path; the without-path branch of #56 has no
4006    /// cross-restart guarantee by design).
4007    ///
4008    /// Cubic-ai P3: this test was previously named
4009    /// `process_nonce_fallback_differs_across_bus_instances`, which
4010    /// contradicted its own assertion (`assert_eq!(n_a, n_b)`).
4011    /// Renamed to match what it actually pins.
4012    #[tokio::test]
4013    async fn process_nonce_fallback_is_cached_within_process() {
4014        struct NonceRecordingAdapter {
4015            nonce: Arc<parking_lot::Mutex<Option<u64>>>,
4016        }
4017        #[async_trait::async_trait]
4018        impl crate::adapter::Adapter for NonceRecordingAdapter {
4019            async fn init(&mut self) -> Result<(), AdapterError> {
4020                Ok(())
4021            }
4022            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
4023                *self.nonce.lock() = Some(batch.process_nonce);
4024                Ok(())
4025            }
4026            async fn flush(&self) -> Result<(), AdapterError> {
4027                Ok(())
4028            }
4029            async fn shutdown(&self) -> Result<(), AdapterError> {
4030                Ok(())
4031            }
4032            async fn poll_shard(
4033                &self,
4034                _: u16,
4035                _: Option<&str>,
4036                _: usize,
4037            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
4038                Ok(crate::adapter::ShardPollResult::empty())
4039            }
4040            fn name(&self) -> &'static str {
4041                "nonce-recording"
4042            }
4043        }
4044
4045        let cfg = || {
4046            EventBusConfig::builder()
4047                .num_shards(1)
4048                .ring_buffer_capacity(1024)
4049                .build()
4050                .unwrap()
4051        };
4052
4053        let n_a = Arc::new(parking_lot::Mutex::new(None));
4054        let n_b = Arc::new(parking_lot::Mutex::new(None));
4055        {
4056            let bus = EventBus::new_with_adapter(
4057                cfg(),
4058                Box::new(NonceRecordingAdapter { nonce: n_a.clone() }),
4059            )
4060            .await
4061            .unwrap();
4062            bus.ingest(Event::new(json!({"i": 1}))).unwrap();
4063            bus.flush().await.unwrap();
4064            bus.shutdown().await.unwrap();
4065        }
4066        {
4067            let bus = EventBus::new_with_adapter(
4068                cfg(),
4069                Box::new(NonceRecordingAdapter { nonce: n_b.clone() }),
4070            )
4071            .await
4072            .unwrap();
4073            bus.ingest(Event::new(json!({"i": 2}))).unwrap();
4074            bus.flush().await.unwrap();
4075            bus.shutdown().await.unwrap();
4076        }
4077
4078        // Note: in a single-process test BOTH bus instances see the
4079        // same `OnceLock`-cached `batch_process_nonce`, so the
4080        // nonces ARE equal here even though the documented
4081        // semantic is "fresh per process." This test pins the
4082        // cached-within-a-process invariant; the across-PROCESSES
4083        // semantic is exercised by the persistent-nonce test
4084        // above (which is the actually-load-bearing path for the
4085        // persistent-nonce fix).
4086        let n_a = n_a.lock().unwrap();
4087        let n_b = n_b.lock().unwrap();
4088        assert_eq!(
4089            n_a, n_b,
4090            "within one process, batch_process_nonce is OnceLock-cached \
4091             so two bus instances see the same nonce — the \
4092             different-across-restarts contract is process-level, \
4093             pinned via `persistent_producer_nonce_survives_bus_restart`",
4094        );
4095    }
4096
4097    /// Regression: `EventBusStats::batches_dispatched`
4098    /// (and the new `events_dispatched`) must actually increment on
4099    /// every successful adapter dispatch. Pre-fix `batches_dispatched`
4100    /// was declared but never updated, so flush()'s Phase 2 progress
4101    /// gate was constant-zero and early-broke after one window —
4102    /// flake on Windows-class timer resolution. Pin both counters
4103    /// directly here so a future refactor that drops the increment
4104    /// fails this test, not the timing-dependent
4105    /// `flush_is_a_delivery_barrier`.
4106    #[tokio::test]
4107    async fn dispatch_increments_bus_level_event_and_batch_counters() {
4108        let received = Arc::new(AtomicU64::new(0));
4109        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
4110            received: received.clone(),
4111        });
4112
4113        let config = EventBusConfig::builder()
4114            .num_shards(2)
4115            .ring_buffer_capacity(1024)
4116            .batch(crate::config::BatchConfig {
4117                min_size: 1,
4118                max_size: 10,
4119                max_delay: Duration::from_millis(10),
4120                adaptive: false,
4121                velocity_window: Duration::from_millis(100),
4122            })
4123            .build()
4124            .unwrap();
4125        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
4126
4127        for i in 0..50 {
4128            bus.ingest(Event::new(json!({"i": i}))).unwrap();
4129        }
4130        bus.flush().await.unwrap();
4131
4132        let batches = bus.stats().batches_dispatched.load(AtomicOrdering::Acquire);
4133        let events = bus.stats().events_dispatched.load(AtomicOrdering::Acquire);
4134        assert!(
4135            batches > 0,
4136            "batches_dispatched must be > 0 after flush — pre-fix it was \
4137             never incremented anywhere, breaking flush()'s Phase 2 progress gate",
4138        );
4139        assert_eq!(
4140            events, 50,
4141            "events_dispatched must equal the number of events handed to \
4142             the adapter (got {events}, expected 50)",
4143        );
4144
4145        bus.shutdown().await.unwrap();
4146    }
4147
4148    /// Regression: BUG_REPORT.md #6 — drop-without-shutdown must
4149    /// still release the drain-finalize gate so detached drain
4150    /// workers can exit instead of parking on the gate until the
4151    /// internal `DRAIN_FINALIZE_TIMEOUT` deadline. Pinning this
4152    /// keeps the `Drop` impl honest if someone refactors the
4153    /// shutdown gates later.
4154    #[tokio::test]
4155    async fn drop_releases_drain_finalize_gate_promptly() {
4156        let config = EventBusConfig::builder()
4157            .num_shards(2)
4158            .ring_buffer_capacity(1024)
4159            .build()
4160            .unwrap();
4161        let bus = EventBus::new(config).await.unwrap();
4162        let drain_gate = bus.drain_finalize_ready.clone();
4163
4164        // Drop without an awaited shutdown.
4165        drop(bus);
4166
4167        // The Drop impl must have set the gate. `DRAIN_FINALIZE_TIMEOUT`
4168        // is 10s; if Drop didn't flip the gate, drain workers would
4169        // park for up to that long before exiting.
4170        assert!(
4171            drain_gate.load(AtomicOrdering::Acquire),
4172            "Drop must release `drain_finalize_ready` so detached drain \
4173             workers exit promptly"
4174        );
4175    }
4176
4177    /// The lossy-shutdown reconciliation at L1615-L1635 has TWO
4178    /// arms after the deadline-snapshot fires:
4179    ///
4180    ///   - `actual_drops > 0` → bump `events_dropped`
4181    ///     (already exercised by the deadline path running with
4182    ///     real producers that never recover)
4183    ///   - `actual_drops == 0` → clear the eager
4184    ///     `shutdown_was_lossy = true` because the drain caught
4185    ///     up to every stranded ingest
4186    ///
4187    /// The second arm is the false-positive-alert fix: pre-fix,
4188    /// the `was_lossy` flag stayed `true` against
4189    /// `events_dropped == 0` for any deadline-triggered shutdown
4190    /// whose final drain happened to catch up — operator
4191    /// dashboards alerting on `was_lossy && dropped == 0` saw
4192    /// false positives.
4193    ///
4194    /// We pin the clear-flag arm by:
4195    ///   1. Manually staging `in_flight_ingests = 1` so the
4196    ///      shutdown spin loop sees pending work.
4197    ///   2. Using `tokio::time::advance(6s)` to force the
4198    ///      hardcoded 5-second deadline (the comments at L1399
4199    ///      and L1725 confirm the deadline uses
4200    ///      `tokio::time::Instant` precisely so paused-time
4201    ///      tests can virtualize it).
4202    ///   3. Simulating the stranded producer completing AFTER
4203    ///      the deadline snapshot: bump `events_ingested += 1`
4204    ///      and drop the in-flight counter back to 0. The
4205    ///      post-drain reconciliation reads
4206    ///      `post_deadline_ingests = 1 - 0 = 1 >= stranded = 1`
4207    ///      so `actual_drops == 0` and the arm clears
4208    ///      `shutdown_was_lossy`.
4209    #[tokio::test(start_paused = true)]
4210    async fn lossy_shutdown_reconciliation_clears_was_lossy_when_drain_catches_up() {
4211        let config = EventBusConfig::builder()
4212            .num_shards(1)
4213            .ring_buffer_capacity(1024)
4214            .without_scaling()
4215            .build()
4216            .unwrap();
4217        let bus = Arc::new(
4218            EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4219                .await
4220                .unwrap(),
4221        );
4222
4223        // Stage one "stranded" in-flight ingest. Real producers
4224        // would have incremented this via try_enter_ingest and
4225        // be mid-call to `shard_manager.ingest`; the test helper
4226        // (defined alongside `EventBus`) holds the counter at 1
4227        // for the duration of the deadline-window.
4228        bus.stage_stranded_ingest(1);
4229
4230        // Spawn shutdown — it'll enter the in-flight spin loop
4231        // and park on `tokio::time::sleep(1ms)` until time
4232        // advances.
4233        let bus_for_shutdown = Arc::clone(&bus);
4234        let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4235
4236        // Yield enough times for the shutdown task to enter the
4237        // spin loop. With paused time, the spin's 1ms sleep
4238        // blocks until we advance.
4239        for _ in 0..10 {
4240            tokio::task::yield_now().await;
4241        }
4242
4243        // Advance past the 5s deadline. The next loop iteration
4244        // observes the deadline, captures `(stranded=1,
4245        // ingested=0, dispatched=0)`, sets `was_lossy = true`,
4246        // and breaks.
4247        tokio::time::advance(std::time::Duration::from_secs(6)).await;
4248        for _ in 0..10 {
4249            tokio::task::yield_now().await;
4250        }
4251
4252        // The eager-set was_lossy flag is visible now.
4253        assert!(
4254            bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4255            "deadline must have set was_lossy=true (eagerly)",
4256        );
4257
4258        // Now simulate the "stranded" producer completing AFTER
4259        // the deadline. The helper bumps events_ingested with
4260        // Release ordering before releasing the in-flight slot
4261        // — matching what a real `ingest` call site does, so
4262        // the reconciliation reads the same arithmetic.
4263        bus.complete_stranded_ingest(1);
4264
4265        // Let shutdown finish — its post-drain reconciliation
4266        // now reads ingested_after=1, post_deadline_ingests=1,
4267        // actual_drops=stranded(1)-1=0 → clears was_lossy.
4268        let _ = shutdown_task.await.unwrap();
4269
4270        assert!(
4271            !bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4272            "reconciliation must clear was_lossy when drain catches up to every stranded ingest \
4273             (post_deadline_ingests >= stranded → actual_drops == 0)",
4274        );
4275        // And events_dropped must NOT have been bumped — the
4276        // whole point of the reconciliation arm is that no event
4277        // was actually lost.
4278        assert_eq!(
4279            bus.stats.events_dropped.load(AtomicOrdering::Acquire),
4280            0,
4281            "no events were actually dropped — reconciliation must not bump events_dropped",
4282        );
4283    }
4284
4285    /// Companion to `lossy_shutdown_reconciliation_clears_was_lossy_*`:
4286    /// pins the OTHER arm of the same if/else at L1615-L1619.
4287    /// When drain does NOT catch up (post_deadline_ingests <
4288    /// stranded), `actual_drops > 0` and `events_dropped` is
4289    /// bumped. The `was_lossy = true` flag set eagerly before the
4290    /// reconciliation stays true here — pre-fix the same code
4291    /// path bumped events_dropped += stranded EAGERLY *and*
4292    /// counted the same events as ingested, breaking the
4293    /// `ingested == dispatched + dropped` invariant.
4294    #[tokio::test(start_paused = true)]
4295    async fn lossy_shutdown_reconciliation_bumps_events_dropped_when_drain_does_not_catch_up() {
4296        let config = EventBusConfig::builder()
4297            .num_shards(1)
4298            .ring_buffer_capacity(1024)
4299            .without_scaling()
4300            .build()
4301            .unwrap();
4302        let bus = Arc::new(
4303            EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4304                .await
4305                .unwrap(),
4306        );
4307
4308        // Stage two stranded ingests so the actual_drops count
4309        // is meaningful (not just 0 vs 1).
4310        bus.stage_stranded_ingest(2);
4311
4312        let bus_for_shutdown = Arc::clone(&bus);
4313        let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4314
4315        for _ in 0..10 {
4316            tokio::task::yield_now().await;
4317        }
4318        tokio::time::advance(std::time::Duration::from_secs(6)).await;
4319        for _ in 0..10 {
4320            tokio::task::yield_now().await;
4321        }
4322
4323        // Drop the in-flight counter to 0 so subsequent shutdown
4324        // bookkeeping doesn't see phantom in-flight work, but do
4325        // NOT bump events_ingested — the producers never
4326        // "completed," so the reconciliation must classify them
4327        // as drops.
4328        bus.release_stranded_ingest(2);
4329
4330        let _ = shutdown_task.await.unwrap();
4331
4332        assert!(
4333            bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4334            "was_lossy must stay true when drain did not catch up",
4335        );
4336        // stranded=2, post_deadline_ingests=0, actual_drops=2.
4337        // We don't pin == 2 exactly because the "known under-count
4338        // window" doc'd at L1595-L1614 acknowledges a ±1 bias
4339        // under racy interleavings; the floor is what matters.
4340        assert!(
4341            bus.stats.events_dropped.load(AtomicOrdering::Acquire) >= 1,
4342            "events_dropped must reflect at least one stranded ingest",
4343        );
4344    }
4345
4346    /// `remove_shard_internal` at L832-L859 dispatches any
4347    /// stranded ring-buffer events through the adapter as a
4348    /// one-shot batch. The normal scale-down path drains the
4349    /// ring via the drain worker before remove fires, so
4350    /// `stranded.is_empty()` on the happy path and L832-L859
4351    /// never runs — but a race window between drain completion
4352    /// and remove (or a manual remove on a shard whose drain
4353    /// worker is parked) leaves events stranded. Pre-fix any
4354    /// stranded events would have been silently dropped; the
4355    /// L832-L859 path now flushes them and bumps both
4356    /// `batches_dispatched` (+1) and `events_dispatched` (+N).
4357    ///
4358    /// We force the stranded-events condition by:
4359    ///   1. Pausing time so the drain worker can't pull from
4360    ///      the ring (its polling sleeps don't tick).
4361    ///   2. Pushing raw events directly into shard 0's ring via
4362    ///      the `with_shard` API.
4363    ///   3. Calling `remove_shard_internal(0)` — its inner
4364    ///      `shard_manager.remove_shard` then returns the
4365    ///      stranded vec, triggering the L832-L859 dispatch.
4366    #[tokio::test(start_paused = true)]
4367    async fn remove_shard_internal_dispatches_stranded_ring_buffer_events() {
4368        let received = Arc::new(AtomicU64::new(0));
4369        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
4370            received: Arc::clone(&received),
4371        });
4372        // Keep the default scaling policy: `remove_shard_internal`
4373        // routes through `shard_manager.remove_shard`, which
4374        // requires the dynamic-scaling mapper. `without_scaling()`
4375        // turns the mapper off and the remove fails with
4376        // "Dynamic scaling not enabled". The scaling MONITOR
4377        // task is a separate concern and isn't auto-started by
4378        // `new_with_adapter` — see
4379        // `start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it`
4380        // for the lifecycle contract.
4381        let config = EventBusConfig::builder()
4382            .num_shards(2)
4383            .ring_buffer_capacity(1024)
4384            .build()
4385            .unwrap();
4386        let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
4387
4388        // Yield enough times for the spawned drain workers to
4389        // reach their first `tokio::time::sleep` and park. With
4390        // `start_paused` they can't tick once parked; the gate
4391        // is here so the worker on shard 0 has *already parked*
4392        // before we push events into the ring. Without this, a
4393        // worker that hadn't yet hit its first park could race
4394        // the push and dispatch via the normal drain path
4395        // instead of the L832-L859 stranded path the test exists
4396        // to pin.
4397        for _ in 0..10 {
4398            tokio::task::yield_now().await;
4399        }
4400
4401        // Stage 5 events directly in shard 0's ring. With paused
4402        // time the drain worker can't pull them out — they sit
4403        // there until remove fires.
4404        let stranded_count = 5u64;
4405        let pushed = bus.shard_manager.with_shard(0, |shard| {
4406            for i in 0..stranded_count {
4407                shard
4408                    .try_push_raw(bytes::Bytes::from(format!("stranded-{i}")))
4409                    .unwrap();
4410            }
4411        });
4412        assert!(
4413            pushed.is_some(),
4414            "shard 0 must exist for try_push_raw to land",
4415        );
4416
4417        let batches_before = bus.stats.batches_dispatched.load(AtomicOrdering::Acquire);
4418        let events_before = bus.stats.events_dispatched.load(AtomicOrdering::Acquire);
4419
4420        // Directly invoke the internal — the production path
4421        // routes through manual_scale_down + finalize_draining,
4422        // but the L832-L859 dispatch arm is the same regardless
4423        // of how we got here.
4424        bus.remove_shard_internal(0)
4425            .await
4426            .expect("remove_shard_internal must succeed");
4427
4428        // The CountingAdapter saw the stranded batch.
4429        assert_eq!(
4430            received.load(AtomicOrdering::SeqCst),
4431            stranded_count,
4432            "stranded events must reach the adapter",
4433        );
4434        // Bus stats bumped by exactly the right amounts.
4435        assert_eq!(
4436            bus.stats.batches_dispatched.load(AtomicOrdering::Acquire),
4437            batches_before + 1,
4438            "exactly one stranded batch must increment batches_dispatched",
4439        );
4440        assert_eq!(
4441            bus.stats.events_dispatched.load(AtomicOrdering::Acquire),
4442            events_before + stranded_count,
4443            "events_dispatched must reflect every stranded event",
4444        );
4445
4446        // Clean shutdown so the remaining shard's drain worker
4447        // exits cleanly rather than getting torn down by the
4448        // bus's `Drop` impl (which silently loses any events
4449        // still in the ring buffers / mpsc channels). Mirrors
4450        // the lifecycle hygiene the other paused-time tests in
4451        // this module follow.
4452        //
4453        // The remaining drain worker (shard 1) is parked on
4454        // `tokio::time::sleep`; tokio's paused-time auto-advance
4455        // wakes it once every task is parked on time, but we
4456        // yield-loop here anyway so the JoinHandle await below
4457        // doesn't depend on auto-advance heuristics.
4458        bus.shutdown_via_ref()
4459            .await
4460            .expect("shutdown must succeed under paused time");
4461    }
4462
4463    /// `EventBus::new_with_adapter` must surface a `Fatal` error
4464    /// — naming the configured path — when `producer_nonce_path`
4465    /// points at a location the runtime can't read or create. The
4466    /// path-load failure path at L292-L301 is the single signal
4467    /// operators get that their persistent-nonce wiring is wrong;
4468    /// silently falling back to the per-process nonce would
4469    /// silently degrade the durability guarantee that path was
4470    /// chosen to provide (at-most-once across restart).
4471    #[tokio::test]
4472    async fn new_with_adapter_surfaces_fatal_when_nonce_path_unreadable() {
4473        // Point at a path whose parent directory doesn't exist.
4474        // `load_or_create` has to mkdir-then-write, so a missing
4475        // parent fails on every platform. Anchoring under
4476        // `std::env::temp_dir()` keeps the path well-formed on
4477        // both Windows (`C:\Users\...\Temp\...`) and Linux/macOS
4478        // (`/tmp/...`) — a hardcoded `C:\\…` literal looks like a
4479        // valid filename on POSIX filesystems and the create
4480        // happily succeeds, silently bypassing the failure-arm
4481        // this test exists to pin.
4482        let marker = "net-coverage-nonexistent-parent";
4483        let bogus_path = std::env::temp_dir()
4484            .join(marker)
4485            .join("deeply")
4486            .join("nested")
4487            .join("nonce");
4488        let config = EventBusConfig::builder()
4489            .num_shards(1)
4490            .ring_buffer_capacity(1024)
4491            .producer_nonce_path(&bogus_path)
4492            .build()
4493            .unwrap();
4494        let adapter: Box<dyn crate::adapter::Adapter> =
4495            Box::new(crate::adapter::NoopAdapter::new());
4496
4497        // EventBus doesn't impl Debug, so we can't unwrap_err
4498        // it directly — pattern-match instead.
4499        let result = EventBus::new_with_adapter(config, adapter).await;
4500        match result {
4501            Err(AdapterError::Fatal(msg)) => {
4502                assert!(
4503                    msg.contains("producer-nonce") || msg.contains("nonce"),
4504                    "error must name the failing subsystem: {msg}",
4505                );
4506                // The path must appear in the message so operators
4507                // can grep their logs to the misconfigured config
4508                // field without guessing. The unique marker
4509                // segment is portable across platforms (the temp
4510                // root differs but the joined marker is the same).
4511                assert!(
4512                    msg.contains(marker),
4513                    "error must include the configured path: {msg}",
4514                );
4515            }
4516            Err(other) => panic!("expected Fatal, got {other:?}"),
4517            Ok(_) => panic!("expected Fatal, got Ok"),
4518        }
4519    }
4520
4521    /// `start_scaling_monitor` must be a no-op on a non-scaling
4522    /// bus AND idempotent under repeated calls on a scaling bus.
4523    /// A regression that drops the early-returns would spawn
4524    /// shadow monitor tasks that hold a Weak<EventBus> until they
4525    /// next observe shutdown — pure-overhead duplicate
4526    /// metrics-tick wakeup rates that compete for the same
4527    /// `evaluate_scaling` lock.
4528    #[tokio::test]
4529    async fn start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it() {
4530        // Scaling explicitly disabled: start_scaling_monitor
4531        // must return before touching the slot. (The builder
4532        // default fills in a ScalingPolicy when scaling isn't
4533        // mentioned — `without_scaling()` is the only way to
4534        // force `config.scaling == None`.)
4535        let config = EventBusConfig::builder()
4536            .num_shards(1)
4537            .ring_buffer_capacity(1024)
4538            .without_scaling()
4539            .build()
4540            .unwrap();
4541        let bus = EventBus::new(config).await.unwrap();
4542        let bus = Arc::new(bus);
4543        bus.start_scaling_monitor();
4544        assert!(
4545            bus.scaling_monitor.lock().is_none(),
4546            "non-scaling bus must not install a monitor",
4547        );
4548
4549        // Scaling enabled: first call installs, second is a no-op
4550        // (the existing handle stays put — proves the early-return
4551        // didn't overwrite the slot).
4552        let policy = crate::shard::ScalingPolicy {
4553            min_shards: 2,
4554            max_shards: 4,
4555            ..Default::default()
4556        };
4557        let config = EventBusConfig::builder()
4558            .num_shards(2)
4559            .ring_buffer_capacity(1024)
4560            .scaling(policy)
4561            .build()
4562            .unwrap();
4563        let bus = Arc::new(EventBus::new(config).await.unwrap());
4564        // EventBus::new doesn't auto-start the monitor — we start
4565        // it explicitly so the test owns the lifecycle.
4566        bus.start_scaling_monitor();
4567        let handle_id_after_first = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4568        assert!(handle_id_after_first.is_some(), "first start must install");
4569
4570        // Second call must NOT replace the handle. If the early-
4571        // return is broken, the slot would carry a new handle id.
4572        bus.start_scaling_monitor();
4573        let handle_id_after_second = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4574        assert_eq!(
4575            handle_id_after_first, handle_id_after_second,
4576            "second start_scaling_monitor must NOT replace the running handle",
4577        );
4578    }
4579}