Skip to main content

net/
bus.rs

1//! Main EventBus facade.
2//!
3//! The EventBus provides a unified API for:
4//! - Event ingestion (non-blocking)
5//! - Event consumption (async polling with filtering)
6//! - Lifecycle management
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14
15use crate::adapter::{Adapter, NoopAdapter};
16use crate::config::{AdapterConfig, BatchConfig, EventBusConfig};
17use crate::consumer::{ConsumeRequest, ConsumeResponse, PollMerger};
18use crate::error::{AdapterError, ConsumerError, IngestionError, IngestionResult};
19use crate::event::{Batch, Event, RawEvent};
20use crate::shard::{BatchWorker, ScalingDecision, ShardManager, ShardMetrics};
21
22#[cfg(feature = "jetstream")]
23use crate::adapter::JetStreamAdapter;
24#[cfg(feature = "net")]
25use crate::adapter::NetAdapter;
26#[cfg(feature = "redis")]
27use crate::adapter::RedisAdapter;
28
29/// The main event bus.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// use net::{EventBus, EventBusConfig, Event};
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
38///     let bus = EventBus::new(EventBusConfig::default()).await?;
39///
40///     // Ingest events
41///     bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
42///
43///     // Poll events
44///     let response = bus.poll(ConsumeRequest::new(100)).await?;
45///
46///     bus.shutdown().await?;
47///     Ok(())
48/// }
49/// ```
50pub struct EventBus {
51    /// Shard manager for parallel ingestion.
52    shard_manager: Arc<ShardManager>,
53    /// Adapter for durable storage.
54    adapter: Arc<dyn Adapter>,
55    /// Poll merger for cross-shard consumption.
56    poll_merger: arc_swap::ArcSwap<PollMerger>,
57    /// Serializes the `shard_manager.shard_ids() → poll_merger.store`
58    /// block in `add_shard_internal` / `remove_shard_internal`.
59    /// Without this lock, two callers (e.g. scaling-monitor
60    /// add_shard racing manual_scale_down's remove_shard) read the
61    /// shard ids snapshot at slightly different points and then
62    /// race on the `arc_swap.store`. The write that lands second
63    /// can clobber the more-current view: T1 reads `{0..5}`, T2
64    /// reads `{1..4}`, T2 stores `{1..4}`, T1 stores `{0..5}` —
65    /// the published merger then routes polls to the just-removed
66    /// shard 0 until the next topology change repairs the
67    /// snapshot.
68    poll_merger_swap_lock: parking_lot::Mutex<()>,
69    /// Per-shard worker handles. Stored separately so shutdown can
70    /// await drain workers *before* batch workers — the drain
71    /// worker's final sweep races the batch worker's exit
72    /// otherwise, and any events the drain worker pushes to the
73    /// channel after the batch worker has stopped reading are
74    /// silently lost.
75    batch_workers: parking_lot::Mutex<std::collections::HashMap<u16, ShardWorkers>>,
76    /// Channels for sending batches to workers (shard_id -> sender).
77    batch_senders: parking_lot::RwLock<
78        std::collections::HashMap<u16, mpsc::Sender<Vec<crate::event::InternalEvent>>>,
79    >,
80    /// Shutdown flag.
81    shutdown: Arc<AtomicBool>,
82    /// Gate signaling drain workers that the in-flight wait has
83    /// completed and they may safely run their final ring-buffer
84    /// sweep. Distinct from `shutdown` because the drain worker
85    /// observing `shutdown=true` alone is not enough: a producer
86    /// that read `shutdown=false` may still be mid-push, and if the
87    /// drain worker rushes through its final sweep before that push
88    /// is visible the event is stranded. `shutdown()` sets this
89    /// after waiting for `in_flight_ingests==0`, at which point the
90    /// Acquire load on the drain side synchronizes-with the Release
91    /// store here, transitively chaining through the SeqCst
92    /// in-flight handshake to make every observed-pre-shutdown push
93    /// visible to the drain worker's subsequent `pop_batch_into`.
94    drain_finalize_ready: Arc<AtomicBool>,
95    /// In-flight ingest counter. Incremented before each ingest's
96    /// shutdown check and decremented after the push completes (or
97    /// bails). `shutdown()` waits for this to drop to zero *after*
98    /// setting `shutdown=true` and *before* setting
99    /// `drain_finalize_ready=true` so no producer is mid-push when
100    /// the drain workers do their final sweep — closing the race
101    /// where a producer that observed `shutdown=false` could push
102    /// *after* the drain worker's last `pop_batch_into` returned
103    /// zero, leaving the event stranded in the ring buffer.
104    /// Pre-fix this was `AtomicU32`. A 4-billion-in-flight
105    /// wrap is not realistic in production, but the counter
106    /// participates in the shutdown protocol — a wrap to 0 would
107    /// trick the wait-for-zero loop into thinking shutdown was
108    /// safe to proceed while producers were still mid-push.
109    /// Widened to `AtomicU64` so the wrap is astronomical
110    /// (1.8e19 in-flight ingests).
111    in_flight_ingests: AtomicU64,
112    /// Set to `true` after `shutdown()` runs to completion. `Drop`
113    /// uses this to detect "dropped without an awaited shutdown" —
114    /// in that case events still in the ring buffers / mpsc channels
115    /// are silently lost (see `Drop` impl).
116    shutdown_completed: AtomicBool,
117    /// Configuration.
118    config: EventBusConfig,
119    /// Statistics.
120    stats: Arc<EventBusStats>,
121    /// Producer nonce. Loaded from
122    /// `config.producer_nonce_path` on startup when the path is
123    /// configured; otherwise falls back to the per-process default
124    /// from `event::batch_process_nonce`. Stamped on every batch
125    /// the bus emits — the worker spawn copies this u64 into
126    /// `BatchWorkerParams::producer_nonce`, and
127    /// `remove_shard_internal`'s stranded-flush uses it via
128    /// `Batch::with_nonce`.
129    producer_nonce: u64,
130    /// Scaling monitor task handle.
131    scaling_monitor: parking_lot::Mutex<Option<JoinHandle<()>>>,
132}
133
134/// Worker handles for a single shard. The drain worker pumps
135/// events from the ring buffer into an mpsc channel; the batch
136/// worker reads from that channel and dispatches to the adapter.
137/// Shutdown ordering is load-bearing — see `EventBus::shutdown`.
138struct ShardWorkers {
139    batch: JoinHandle<()>,
140    drain: JoinHandle<()>,
141    /// Bus-owned mirror of the BatchWorker's `next_sequence`.
142    /// `remove_shard_internal` reads this AFTER awaiting `batch`
143    /// to learn the worker's final post-flush sequence, then uses
144    /// it as the `sequence_start` for the stranded-ring-buffer
145    /// flush so the stranded msg-ids fall strictly past every
146    /// msg-id the worker emitted — without this, JetStream dedup
147    /// would silently drop the stranded batch when both used 0.
148    next_sequence: Arc<AtomicU64>,
149}
150
151/// RAII guard for an in-flight ingest. Decrements
152/// `in_flight_ingests` on drop so `shutdown()` can wait for the
153/// counter to reach zero.
154struct IngestGuard<'a> {
155    bus: &'a EventBus,
156}
157
158impl Drop for IngestGuard<'_> {
159    fn drop(&mut self) {
160        self.bus
161            .in_flight_ingests
162            .fetch_sub(1, AtomicOrdering::SeqCst);
163    }
164}
165
166/// Event bus statistics.
167#[derive(Debug, Default)]
168pub struct EventBusStats {
169    /// Total events ingested.
170    pub events_ingested: AtomicU64,
171    /// Events dropped due to backpressure.
172    pub events_dropped: AtomicU64,
173    /// Batches dispatched to adapter.
174    ///
175    /// Pre-fix this field was declared but never incremented anywhere
176    /// — `flush()`'s Phase 2 progress probe (`bus.rs:815`) read it as
177    /// "did the BatchWorker make progress this `max_delay` window?",
178    /// observed `0 == 0`, and always early-broke after a single
179    /// window. On Windows-class timer resolution the race against the
180    /// BatchWorker's first `recv_timeout` tipped the wrong way for
181    /// `flush_is_a_delivery_barrier` regularly. Now incremented in
182    /// the BatchWorker spawn (after a successful `dispatch_batch`)
183    /// and in `remove_shard_internal`'s stranded-flush.
184    pub batches_dispatched: AtomicU64,
185    /// Total events dispatched to the adapter (sum of batch lengths
186    /// from successful `on_batch` calls). Companion to
187    /// `batches_dispatched` — by the time `flush()` returns,
188    /// `events_dispatched + events_dropped == events_ingested`. FFI
189    /// consumers can also use this to monitor end-to-end delivery.
190    pub events_dispatched: AtomicU64,
191    /// Set to `true` if `shutdown()` / `shutdown_via_ref()` had to
192    /// proceed past the in-flight-ingest grace deadline (5 s) with
193    /// producers still mid-push. The stranded count is counted into
194    /// `events_dropped` (already; pre-fix), but `shutdown` returns
195    /// `Ok(())` regardless — callers that need to distinguish
196    /// "clean shutdown" from "lossy shutdown" check this flag in
197    /// `bus.stats()` afterward (only meaningful for the
198    /// `shutdown_via_ref` path that doesn't consume the bus).
199    ///
200    /// Pre-fix the warning + `events_dropped` increment
201    /// were the only signal; `Result<(), AdapterError>` returned
202    /// `Ok` indistinguishable from a clean shutdown.
203    pub shutdown_was_lossy: std::sync::atomic::AtomicBool,
204}
205
206impl EventBus {
207    /// Create a new event bus with the given configuration.
208    pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError> {
209        // Create adapter from config
210        let adapter: Box<dyn Adapter> = match &config.adapter {
211            AdapterConfig::Noop => Box::new(NoopAdapter::new()),
212            #[cfg(feature = "redis")]
213            AdapterConfig::Redis(redis_config) => {
214                Box::new(RedisAdapter::new(redis_config.clone())?)
215            }
216            #[cfg(feature = "jetstream")]
217            AdapterConfig::JetStream(js_config) => {
218                Box::new(JetStreamAdapter::new(js_config.clone())?)
219            }
220            #[cfg(feature = "net")]
221            AdapterConfig::Net(net_config) => Box::new(NetAdapter::new((**net_config).clone())?),
222        };
223
224        Self::new_with_adapter(config, adapter).await
225    }
226
227    /// Create a new event bus with a caller-supplied adapter.
228    ///
229    /// `config.adapter` is ignored — the supplied `adapter` is used
230    /// instead. Useful for tests that need to observe or inject
231    /// behavior at the adapter boundary (e.g. a counting adapter
232    /// for end-to-end delivery assertions, a flaky adapter for
233    /// retry-path coverage).
234    pub async fn new_with_adapter(
235        config: EventBusConfig,
236        mut adapter: Box<dyn Adapter>,
237    ) -> Result<Self, AdapterError> {
238        config
239            .validate()
240            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
241
242        // Initialize adapter (with timeout to prevent hanging on unreachable backends)
243        tokio::time::timeout(config.adapter_timeout, adapter.init())
244            .await
245            .map_err(|_| AdapterError::Fatal("adapter init timed out".into()))??;
246        let adapter: Arc<dyn Adapter> = Arc::from(adapter);
247
248        // Create shard manager (with or without dynamic scaling)
249        let shard_manager = if let Some(ref scaling_policy) = config.scaling {
250            Arc::new(
251                ShardManager::with_mapper(
252                    config.num_shards,
253                    config.ring_buffer_capacity,
254                    config.backpressure_mode,
255                    scaling_policy.clone(),
256                )
257                .map_err(|e| AdapterError::Fatal(e.to_string()))?,
258            )
259        } else {
260            Arc::new(ShardManager::new(
261                config.num_shards,
262                config.ring_buffer_capacity,
263                config.backpressure_mode,
264            ))
265        };
266
267        // Create poll merger.
268        //
269        // Pass the live id set rather than the count. At
270        // initial construction the ids are dense (`0..num_shards`),
271        // but using `shard_ids()` here keeps a single code path with
272        // the post-scaling re-stores below.
273        let poll_merger = arc_swap::ArcSwap::from_pointee(PollMerger::new(
274            adapter.clone(),
275            shard_manager.shard_ids(),
276        ));
277
278        // Shutdown flag and drain-finalize gate. See `drain_finalize_ready`
279        // doc on `EventBus` for the synchronization contract.
280        let shutdown = Arc::new(AtomicBool::new(false));
281        let drain_finalize_ready = Arc::new(AtomicBool::new(false));
282
283        // Stats are shared with every BatchWorker spawn so successful
284        // dispatches increment `batches_dispatched` / `events_dispatched`
285        // — `flush()`'s Phase 2 progress probe gates on those.
286        let stats = Arc::new(EventBusStats::default());
287
288        // Producer nonce. Persistent path → load-or-create
289        // the durable u64 so cross-process retries dedup against the
290        // prior incarnation. No path → per-process default (today's
291        // at-most-once-across-restart behavior).
292        let producer_nonce = match &config.producer_nonce_path {
293            Some(path) => crate::adapter::PersistentProducerNonce::load_or_create(path)
294                .map_err(|e| {
295                    AdapterError::Fatal(format!(
296                        "failed to load/create producer-nonce file at {}: {e}",
297                        path.display(),
298                    ))
299                })?
300                .nonce(),
301            None => crate::event::batch_process_nonce(),
302        };
303
304        // Create batch workers for each shard
305        let mut batch_workers: std::collections::HashMap<u16, ShardWorkers> =
306            std::collections::HashMap::with_capacity(config.num_shards as usize);
307        let mut batch_senders =
308            std::collections::HashMap::with_capacity(config.num_shards as usize);
309
310        for shard_id in 0..config.num_shards {
311            let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
312
313            let next_sequence = Arc::new(AtomicU64::new(0));
314
315            let batch = spawn_batch_worker(BatchWorkerParams {
316                shard_id,
317                rx,
318                adapter: adapter.clone(),
319                shard_manager: shard_manager.clone(),
320                config: config.batch.clone(),
321                adapter_timeout: config.adapter_timeout,
322                batch_retries: config.adapter_batch_retries,
323                next_sequence: next_sequence.clone(),
324                stats: stats.clone(),
325                producer_nonce,
326            });
327
328            let drain = spawn_drain_worker_for_shard(
329                shard_id,
330                shard_manager.clone(),
331                tx.clone(),
332                shutdown.clone(),
333                drain_finalize_ready.clone(),
334            );
335
336            batch_workers.insert(
337                shard_id,
338                ShardWorkers {
339                    batch,
340                    drain,
341                    next_sequence,
342                },
343            );
344            batch_senders.insert(shard_id, tx);
345        }
346
347        let bus = Self {
348            shard_manager,
349            adapter,
350            poll_merger,
351            poll_merger_swap_lock: parking_lot::Mutex::new(()),
352            batch_workers: parking_lot::Mutex::new(batch_workers),
353            batch_senders: parking_lot::RwLock::new(batch_senders),
354            shutdown,
355            drain_finalize_ready,
356            in_flight_ingests: AtomicU64::new(0),
357            shutdown_completed: AtomicBool::new(false),
358            config,
359            stats,
360            producer_nonce,
361            scaling_monitor: parking_lot::Mutex::new(None),
362        };
363
364        Ok(bus)
365    }
366
367    /// Start the scaling monitor (if dynamic scaling is enabled).
368    /// This spawns a background task that periodically evaluates scaling decisions.
369    ///
370    /// The spawned task holds a `Weak<Self>` rather than a strong
371    /// `Arc<Self>` clone. With a strong clone the task kept the bus
372    /// alive forever, and `shutdown(self)` (which consumes by value)
373    /// was unreachable: callers with an `Arc<EventBus>` could not
374    /// `Arc::try_unwrap` to consume it because the spawned task always
375    /// held one of the strong refs.
376    ///
377    /// With a `Weak`, the monitor task upgrades each tick. Once the
378    /// last caller-held `Arc` is dropped, the upgrade fails and the
379    /// task exits cleanly. To shut down via `shutdown(self)`, the
380    /// caller must hold the only strong reference: `Arc::try_unwrap`
381    /// on the resulting bus succeeds because the spawned task only
382    /// holds a Weak.
383    pub fn start_scaling_monitor(self: &Arc<Self>) {
384        if self.config.scaling.is_none() {
385            return;
386        }
387
388        // Idempotency check: no-op when a monitor is already
389        // installed. Otherwise a second `start_scaling_monitor`
390        // call would overwrite the slot without aborting the
391        // previous `JoinHandle` — the displaced task would keep
392        // running, holding a `Weak<EventBus>`, only exiting when it
393        // next observed `shutdown` or failed to upgrade. Two
394        // concurrent monitors would briefly compete for
395        // `evaluate_scaling`'s lock, doubling the metrics-tick
396        // wakeup rate.
397        let mut slot = self.scaling_monitor.lock();
398        if slot.is_some() {
399            tracing::debug!("start_scaling_monitor: monitor already running, skipping");
400            return;
401        }
402
403        let weak = Arc::downgrade(self);
404        let handle = tokio::spawn(async move {
405            run_scaling_monitor_via_weak(weak).await;
406        });
407
408        *slot = Some(handle);
409    }
410
411    /// Internal: Add a new shard with its workers.
412    ///
413    /// The previous implementation called `shard_manager.add_shard()`
414    /// first, which atomically marked the shard `Active` and published
415    /// it to the routing table. So `select_shard` could route producer
416    /// pushes to the new id *before* any drain or batch worker existed,
417    /// leaving events queued in a buffer with no consumer (and
418    /// triggering the configured backpressure mode if the buffer
419    /// filled).
420    ///
421    /// The fix uses the two-phase API on `ShardManager`:
422    ///   1. `add_shard()` allocates the id and metrics collector,
423    ///      adds the shard to the routing table in `Provisioning`
424    ///      state — so `with_shard` works (which the drain worker
425    ///      needs) but `select_shard` skips it.
426    ///   2. Spawn batch + drain workers and register the sender.
427    ///   3. `activate_shard()` flips state to `Active`. Only now
428    ///      does `select_shard` start routing producer pushes.
429    async fn add_shard_internal(&self) -> Result<u16, AdapterError> {
430        self.add_shard_internal_with_cooldown_policy(false).await
431    }
432
433    /// Like [`add_shard_internal`] but bypasses the auto-scaling
434    /// cooldown. See [`ShardManager::add_shard_force`].
435    async fn add_shard_internal_force(&self) -> Result<u16, AdapterError> {
436        self.add_shard_internal_with_cooldown_policy(true).await
437    }
438
439    async fn add_shard_internal_with_cooldown_policy(
440        &self,
441        force: bool,
442    ) -> Result<u16, AdapterError> {
443        // Step 1: provisioning add — not yet selectable.
444        let new_id = if force {
445            self.shard_manager.add_shard_force()
446        } else {
447            self.shard_manager.add_shard()
448        }
449        .map_err(|e| AdapterError::Fatal(e.to_string()))?;
450
451        // Step 2: spawn workers and register the sender.
452        let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
453
454        let next_sequence = Arc::new(AtomicU64::new(0));
455
456        let batch = spawn_batch_worker(BatchWorkerParams {
457            shard_id: new_id,
458            rx,
459            adapter: self.adapter.clone(),
460            shard_manager: self.shard_manager.clone(),
461            config: self.config.batch.clone(),
462            adapter_timeout: self.config.adapter_timeout,
463            batch_retries: self.config.adapter_batch_retries,
464            next_sequence: next_sequence.clone(),
465            stats: self.stats.clone(),
466            producer_nonce: self.producer_nonce,
467        });
468
469        // Order of publish here matters. Today `select_shard`
470        // filters on `state == Active` and a shard is still
471        // `Provisioning` at this point, so producers cannot route
472        // to it yet — but the pattern (insert sender → spawn drain
473        // worker → activate_shard) is ordering-fragile: a future
474        // refactor that flips `activate_shard` ahead of the drain-
475        // worker spawn would leave a window where producers can
476        // push but no drain worker is polling. Pin the order by
477        // installing the sender BEFORE the drain worker (existing
478        // behaviour) so the worker observes a fully-published
479        // sender at spawn time; `activate_shard` runs further down.
480        self.batch_senders.write().insert(new_id, tx.clone());
481
482        let drain = spawn_drain_worker_for_shard(
483            new_id,
484            self.shard_manager.clone(),
485            tx,
486            self.shutdown.clone(),
487            self.drain_finalize_ready.clone(),
488        );
489
490        self.batch_workers.lock().insert(
491            new_id,
492            ShardWorkers {
493                batch,
494                drain,
495                next_sequence,
496            },
497        );
498
499        // Step 3: workers are live — flip the shard to Active so
500        // `select_shard` will route to it.
501        //
502        // On `activate_shard` failure we mirror
503        // `remove_shard_internal`'s teardown: drop the sender,
504        // unmap the provisioning entry (which atomically pops any
505        // residual ring-buffer events), gracefully await both
506        // workers (so the drain worker's `scratch` Vec sends its
507        // contents on the channel and the batch worker's
508        // `current_batch` is flushed via the channel-close path),
509        // then dispatch any stranded ring-buffer events through
510        // the adapter. Pre-fix this used `.abort()` on both
511        // handles which dropped the drain
512        // worker's scratch and the batch worker's current_batch
513        // without dispatch.
514        if let Err(e) = self.shard_manager.activate_shard(new_id) {
515            tracing::warn!(
516                shard_id = new_id,
517                error = %e,
518                "activate_shard failed; rolling back provisioning state"
519            );
520
521            // 1. Drop the bus-side sender. The drain worker still
522            //    holds its own clone, so the channel stays open
523            //    until `with_shard` returns None (step 2) and the
524            //    drain worker breaks out of its loop, dropping
525            //    its sender and finally closing the channel.
526            self.batch_senders.write().remove(&new_id);
527
528            // 2. Atomically pop any ring-buffer residue and
529            //    unmap the Provisioning entry. After this,
530            //    `with_shard(new_id)` returns None and the drain
531            //    worker exits at its next poll (after sending
532            //    any events it had already popped into `scratch`
533            //    on this iteration).
534            //
535            //    For a brand-new Provisioning shard the buffer
536            //    should be empty (`select_shard` skips
537            //    non-Active states), so `stranded` is normally
538            //    `Vec::new()`. The flush below is a defensive
539            //    no-op on the happy path but covers any future
540            //    code path that routes to a Provisioning shard
541            //    or any race window that tucked an event in
542            //    before `activate_shard` returned its error.
543            let stranded = self.shard_manager.remove_shard(new_id).unwrap_or_default();
544
545            // 3. Take ownership of the worker handles and await
546            //    them gracefully. Order: drain first (it pumps
547            //    its scratch + final-sweep contents into the
548            //    channel and exits), then batch (which receives
549            //    those events plus any prior channel residue,
550            //    flushes its own `current_batch`, and exits).
551            //
552            //    Awaiting in this order is what makes the drain
553            //    worker's scratch reach the adapter — the
554            //    drain's `Some(N>0)` arm `mem::replace`s scratch
555            //    into a batch and `sender.send(batch).await`s
556            //    it; that send must complete (or fail) before
557            //    the drain worker breaks. The batch worker's
558            //    `Ok(None)` arm then runs after both senders
559            //    are dropped and flushes any pending batch.
560            // Bound each JoinHandle await so a worker that's
561            // parked inside a slow adapter call (e.g. drain worker
562            // mid-`sender.send().await` against a backpressured
563            // channel because the batch worker is itself blocked
564            // in the adapter) cannot pin rollback indefinitely.
565            // 2x `adapter_timeout` is the natural ceiling: the
566            // batch worker uses `adapter_timeout` per dispatch and
567            // is expected to wake within that window. A timeout
568            // here leaks the JoinHandle — acceptable because
569            // step 1 already removed the bus-side sender, so the
570            // detached task can't observe new work and will exit
571            // on its next loop iteration.
572            let workers = self.batch_workers.lock().remove(&new_id);
573            // `worker_detached` is set when either join times out
574            // — the worker is still running on a leaked
575            // JoinHandle. In that case the `next_sequence` atomic
576            // is no longer a reliable upper bound: the detached
577            // worker may publish a final batch whose msg-ids land
578            // in the same `[next_sequence..N]` range we'd use for
579            // the stranded-flush, producing duplicate XADDs or
580            // JetStream dedup hits. Skip the stranded-flush when
581            // detached and surface the loss explicitly so the
582            // operator sees it.
583            let mut worker_detached = false;
584            let final_next_sequence = if let Some(workers) = workers {
585                let bound = self.config.adapter_timeout.saturating_mul(2);
586                match tokio::time::timeout(bound, workers.drain).await {
587                    Ok(Ok(())) => {}
588                    Ok(Err(err)) => {
589                        tracing::warn!(
590                            shard_id = new_id,
591                            error = %err,
592                            "drain worker JoinHandle errored on activate-failure rollback"
593                        );
594                    }
595                    Err(_) => {
596                        tracing::warn!(
597                            shard_id = new_id,
598                            timeout_ms = bound.as_millis() as u64,
599                            "drain worker did not exit within timeout on activate-failure rollback; detaching"
600                        );
601                        worker_detached = true;
602                    }
603                }
604                match tokio::time::timeout(bound, workers.batch).await {
605                    Ok(Ok(())) => {}
606                    Ok(Err(err)) => {
607                        tracing::warn!(
608                            shard_id = new_id,
609                            error = %err,
610                            "BatchWorker JoinHandle errored on activate-failure rollback"
611                        );
612                    }
613                    Err(_) => {
614                        tracing::warn!(
615                            shard_id = new_id,
616                            timeout_ms = bound.as_millis() as u64,
617                            "BatchWorker did not exit within timeout on activate-failure rollback; detaching"
618                        );
619                        worker_detached = true;
620                    }
621                }
622                workers.next_sequence.load(AtomicOrdering::Acquire)
623            } else {
624                0
625            };
626
627            // 4. Dispatch any stranded events as a single-shot
628            //    batch so they reach durable storage with the
629            //    correct sequence-id segment. Identical to the
630            //    `remove_shard_internal` teardown — but only when
631            //    the worker actually exited. If it timed out and
632            //    is still running on a leaked handle, dispatching
633            //    here would emit msg-ids overlapping the worker's
634            //    final flush; surface the events as dropped
635            //    instead so the duplicate-on-the-wire hazard is
636            //    avoided.
637            if !stranded.is_empty() && worker_detached {
638                let count = stranded.len();
639                self.stats
640                    .events_dropped
641                    .fetch_add(count as u64, AtomicOrdering::Relaxed);
642                self.stats
643                    .shutdown_was_lossy
644                    .store(true, AtomicOrdering::Release);
645                tracing::error!(
646                    shard_id = new_id,
647                    count,
648                    "activate-failure rollback: skipping stranded-flush \
649                     because a worker JoinHandle timed out and may still \
650                     be running; events would collide with the detached \
651                     worker's final batch on the wire. Counted as dropped."
652                );
653            } else if !stranded.is_empty() {
654                let count = stranded.len();
655                let batch = crate::event::Batch::with_nonce(
656                    new_id,
657                    stranded,
658                    final_next_sequence,
659                    self.producer_nonce,
660                );
661                let dispatched = dispatch_batch(
662                    &*self.adapter,
663                    Arc::new(batch),
664                    new_id,
665                    self.config.adapter_timeout,
666                    self.config.adapter_batch_retries,
667                )
668                .await;
669                if dispatched {
670                    self.stats
671                        .batches_dispatched
672                        .fetch_add(1, AtomicOrdering::Relaxed);
673                    self.stats
674                        .events_dispatched
675                        .fetch_add(count as u64, AtomicOrdering::Relaxed);
676                    tracing::info!(
677                        shard_id = new_id,
678                        count,
679                        "activate-failure rollback: flushed stranded events to adapter",
680                    );
681                } else {
682                    tracing::error!(
683                        shard_id = new_id,
684                        count,
685                        "activate-failure rollback: adapter rejected stranded events; \
686                         events lost"
687                    );
688                }
689            }
690
691            return Err(AdapterError::Fatal(e.to_string()));
692        }
693
694        // Update poll merger with the post-add id set. Hold
695        // `poll_merger_swap_lock` across the snapshot-and-store so a
696        // concurrent remove_shard can't sneak between our `shard_ids()`
697        // read and our `arc_swap.store` and clobber the published view
698        // with a stale snapshot.
699        {
700            let _swap_guard = self.poll_merger_swap_lock.lock();
701            self.poll_merger.store(Arc::new(PollMerger::new(
702                self.adapter.clone(),
703                self.shard_manager.shard_ids(),
704            )));
705        }
706
707        tracing::info!(shard_id = new_id, "Added new shard");
708        Ok(new_id)
709    }
710
711    /// Internal: Remove a stopped shard.
712    ///
713    /// Previously this dropped the worker `JoinHandle`s and unmapped
714    /// the shard without first draining its ring buffer. Any events
715    /// still queued at the moment of removal — even just a few from a
716    /// producer that pushed concurrently with the scale-down decision
717    /// — were silently stranded once the drain worker exited on
718    /// `with_shard → None`.
719    ///
720    /// The fix:
721    ///   1. Wait for the drain worker we're about to retire to flush
722    ///      the channel, by closing the bus-side sender first.
723    ///   2. Call `remove_shard`, which atomically pops the
724    ///      ring-buffer remnants and unmaps the shard. The drained
725    ///      events come back to us as a `Vec`.
726    ///   3. Hand those events directly to the adapter as a
727    ///      single-shot batch — bypassing the per-shard pipeline
728    ///      that's already being torn down — so they reach durable
729    ///      storage.
730    async fn remove_shard_internal(&self, shard_id: u16) -> Result<(), AdapterError> {
731        // Step 1: drop the bus-side sender. The drain worker still
732        // has its own clone and will keep draining; we want it to
733        // exit when its `with_shard` call returns `None` after
734        // step 2's unmap.
735        self.batch_senders.write().remove(&shard_id);
736
737        // Step 2: atomically drain whatever's in the ring buffer and
738        // unmap. After this, `with_shard(shard_id)` returns `None`
739        // and the drain worker exits at its next poll.
740        let stranded = self
741            .shard_manager
742            .remove_shard(shard_id)
743            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
744
745        // Step 3: take ownership of the worker handles (move them
746        // OUT of the mutex map so we can `await` them — `await`
747        // consumes a `JoinHandle`). With the bus-side sender already
748        // dropped (step 1) and the shard unmapped (step 2), the
749        // drain worker exits at its next poll and drops its sender
750        // clone, which closes the BatchWorker's `rx`. The
751        // BatchWorker then flushes any pending `current_batch` and
752        // any events still buffered in the channel, dispatches them
753        // via the standard `dispatch_batch` path with their PROPER
754        // `next_sequence` values, and exits.
755        //
756        // Await order: drain first, then batch. The drain worker's
757        // `Some(N>0)` arm `mem::replace`s scratch into a batch and
758        // `sender.send(batch).await`s it; that send must complete
759        // (or fail) before drain breaks. The batch worker's
760        // `Ok(None)` arm runs after the sender drops and flushes
761        // any pending batch. Awaiting in the reverse order would
762        // park here forever: the batch worker's `recv()` only
763        // returns `None` once every sender clone (including the
764        // drain worker's) has dropped.
765        //
766        // Both awaits are bounded by `2 × adapter_timeout` so a
767        // worker parked inside a slow adapter call cannot pin
768        // teardown indefinitely. A timeout leaks the JoinHandle —
769        // acceptable because step 1 already removed the bus-side
770        // sender and step 2 unmapped the shard, so the detached
771        // task can't observe new work and will exit on its next
772        // loop iteration.
773        let workers = self.batch_workers.lock().remove(&shard_id);
774        let final_next_sequence = if let Some(workers) = workers {
775            let bound = self.config.adapter_timeout.saturating_mul(2);
776            match tokio::time::timeout(bound, workers.drain).await {
777                Ok(Ok(())) => {}
778                Ok(Err(e)) => {
779                    tracing::warn!(
780                        shard_id,
781                        error = %e,
782                        "drain worker JoinHandle errored on await; \
783                         drain worker should have already exited via \
784                         `with_shard -> None`",
785                    );
786                }
787                Err(_) => {
788                    tracing::warn!(
789                        shard_id,
790                        timeout_ms = bound.as_millis() as u64,
791                        "drain worker did not exit within timeout on remove_shard; detaching",
792                    );
793                }
794            }
795            match tokio::time::timeout(bound, workers.batch).await {
796                Ok(Ok(())) => {}
797                Ok(Err(e)) => {
798                    tracing::warn!(
799                        shard_id,
800                        error = %e,
801                        "BatchWorker JoinHandle errored on await; \
802                         proceeding with stranded-flush using last \
803                         published next_sequence",
804                    );
805                }
806                Err(_) => {
807                    tracing::warn!(
808                        shard_id,
809                        timeout_ms = bound.as_millis() as u64,
810                        "BatchWorker did not exit within timeout on remove_shard; detaching",
811                    );
812                }
813            }
814            workers.next_sequence.load(AtomicOrdering::Acquire)
815        } else {
816            // No worker registered for this shard — shouldn't
817            // happen on the normal scale-down path, but defensively
818            // fall back to 0. This branch only activates if a
819            // caller manages to call `remove_shard_internal` for a
820            // shard that was never spawned (or already removed).
821            0
822        };
823
824        // Step 4: flush the stranded ring-buffer events through the
825        // adapter in one shot, using `final_next_sequence` (NOT 0)
826        // as the `sequence_start`. The stranded batch's msg-ids
827        // are `{nonce}:{shard_id}:{final_next_sequence}:{i}` —
828        // strictly past every msg-id the worker emitted. Using 0
829        // would collide with the worker's very first batch
830        // (`{nonce}:{shard_id}:0:{i}`), and JetStream's 2 min dedup
831        // window would silently drop the duplicate.
832        if !stranded.is_empty() {
833            let count = stranded.len();
834            // Use the bus's loaded producer nonce so the stranded
835            // batch's msg-ids share the same producer-identity
836            // segment as everything else this bus has emitted —
837            // critical for cross-process dedup.
838            let batch = crate::event::Batch::with_nonce(
839                shard_id,
840                stranded,
841                final_next_sequence,
842                self.producer_nonce,
843            );
844            let dispatched = dispatch_batch(
845                &*self.adapter,
846                Arc::new(batch),
847                shard_id,
848                self.config.adapter_timeout,
849                self.config.adapter_batch_retries,
850            )
851            .await;
852            if dispatched {
853                self.stats
854                    .batches_dispatched
855                    .fetch_add(1, AtomicOrdering::Relaxed);
856                self.stats
857                    .events_dispatched
858                    .fetch_add(count as u64, AtomicOrdering::Relaxed);
859                tracing::info!(
860                    shard_id,
861                    count,
862                    sequence_start = final_next_sequence,
863                    "Removed shard: flushed stranded ring-buffer events to adapter"
864                );
865            } else {
866                tracing::error!(
867                    shard_id,
868                    count,
869                    sequence_start = final_next_sequence,
870                    "Removed shard: adapter rejected stranded ring-buffer events; \
871                     events lost"
872                );
873            }
874        }
875
876        // Update poll merger with the post-remove id set.
877        // Without this, a default-shards poll (`request.shards == None`)
878        // would still iterate `0..num_shards` and skip the live shard
879        // whose id is now the largest, while polling a nonexistent /
880        // recreated shard at the bottom of the range.
881        //
882        // `poll_merger_swap_lock` serializes against
883        // `add_shard_internal`'s matching block — see the field doc.
884        {
885            let _swap_guard = self.poll_merger_swap_lock.lock();
886            self.poll_merger.store(Arc::new(PollMerger::new(
887                self.adapter.clone(),
888                self.shard_manager.shard_ids(),
889            )));
890        }
891
892        tracing::info!(shard_id = shard_id, "Removed shard");
893        Ok(())
894    }
895
896    /// Try to enter an ingest critical section. Returns `None` if
897    /// shutdown is in progress, in which case the caller must
898    /// return `IngestionError::ShuttingDown` without touching the
899    /// shard manager.
900    ///
901    /// The `fetch_add` + load(`shutdown`) sequence pairs with
902    /// `shutdown()`'s store(`shutdown=true`) + wait-for-zero on
903    /// `in_flight_ingests`. SeqCst on both sides closes the
904    /// stranding race: every ingest that is observed as in-flight
905    /// during shutdown's wait is guaranteed to complete before the
906    /// drain workers do their final ring-buffer sweep, so no event
907    /// can land in a ring buffer after the drain worker has stopped
908    /// reading from it.
909    #[inline(always)]
910    fn try_enter_ingest(&self) -> Option<IngestGuard<'_>> {
911        self.in_flight_ingests.fetch_add(1, AtomicOrdering::SeqCst);
912        if self.shutdown.load(AtomicOrdering::SeqCst) {
913            self.in_flight_ingests.fetch_sub(1, AtomicOrdering::SeqCst);
914            return None;
915        }
916        Some(IngestGuard { bus: self })
917    }
918
919    /// Ingest an event.
920    ///
921    /// This is a non-blocking operation. The event is added to the appropriate
922    /// shard's ring buffer and will be batched and persisted asynchronously.
923    ///
924    /// # Returns
925    ///
926    /// The shard ID and insertion timestamp on success.
927    #[inline]
928    pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)> {
929        let _g = self
930            .try_enter_ingest()
931            .ok_or(IngestionError::ShuttingDown)?;
932
933        match self.shard_manager.ingest(event.into_inner()) {
934            Ok((shard_id, ts)) => {
935                self.stats
936                    .events_ingested
937                    .fetch_add(1, AtomicOrdering::Relaxed);
938                Ok((shard_id, ts))
939            }
940            Err(e) => {
941                self.stats
942                    .events_dropped
943                    .fetch_add(1, AtomicOrdering::Relaxed);
944                Err(e)
945            }
946        }
947    }
948
949    /// Ingest a raw event (pre-serialized with cached hash).
950    ///
951    /// This is the fastest ingestion path:
952    /// - Uses pre-computed hash for shard selection (no serialization)
953    /// - Stores bytes directly (no clone needed, reference-counted)
954    ///
955    /// # Returns
956    ///
957    /// The shard ID and insertion timestamp on success.
958    #[inline]
959    pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)> {
960        let _g = self
961            .try_enter_ingest()
962            .ok_or(IngestionError::ShuttingDown)?;
963
964        match self.shard_manager.ingest_raw(event) {
965            Ok((shard_id, ts)) => {
966                self.stats
967                    .events_ingested
968                    .fetch_add(1, AtomicOrdering::Relaxed);
969                Ok((shard_id, ts))
970            }
971            Err(e) => {
972                self.stats
973                    .events_dropped
974                    .fetch_add(1, AtomicOrdering::Relaxed);
975                Err(e)
976            }
977        }
978    }
979
980    /// Ingest a batch of events.
981    ///
982    /// This is more efficient than calling `ingest` repeatedly: events
983    /// destined for the same shard share a single mutex acquisition.
984    ///
985    /// # Returns
986    ///
987    /// The number of successfully ingested events.
988    pub fn ingest_batch(&self, events: Vec<Event>) -> usize {
989        // The shutdown gate lives in `ingest_raw_batch`, which we
990        // forward to. No separate guard here — that would double-
991        // count `in_flight_ingests` (once for this call, once for
992        // the inner call) and could deadlock shutdown under high
993        // contention.
994        let raw: Vec<RawEvent> = events.into_iter().map(|e| e.into_raw()).collect();
995        self.ingest_raw_batch(raw)
996    }
997
998    /// Ingest a batch of raw events (fastest batch ingestion).
999    ///
1000    /// Groups events by their destination shard and pushes each group
1001    /// under a single mutex acquisition.
1002    ///
1003    /// # Returns
1004    ///
1005    /// The number of successfully ingested events.
1006    pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize {
1007        let _g = match self.try_enter_ingest() {
1008            Some(g) => g,
1009            None => return 0,
1010        };
1011
1012        let total = events.len();
1013        let (success, unrouted) = self.shard_manager.ingest_raw_batch(events);
1014        if success > 0 {
1015            self.stats
1016                .events_ingested
1017                .fetch_add(success as u64, AtomicOrdering::Relaxed);
1018        }
1019        // Subtract `unrouted` from the buffer-fullness drop count
1020        // so the same event isn't tallied in both `events_unrouted`
1021        // (bumped inside `ShardManager::ingest_raw_batch`) and
1022        // `events_dropped` — using a plain `total - success` here
1023        // would double-count unrouted events. Backpressure drops
1024        // = events that reached a shard but failed to push.
1025        let dropped = total.saturating_sub(success).saturating_sub(unrouted);
1026        if dropped > 0 {
1027            self.stats
1028                .events_dropped
1029                .fetch_add(dropped as u64, AtomicOrdering::Relaxed);
1030        }
1031        success
1032    }
1033
1034    /// Poll events from the bus.
1035    ///
1036    /// This retrieves events from storage according to the request parameters.
1037    ///
1038    /// # Topology-change visibility
1039    ///
1040    /// `ArcSwap::load()` snapshots the current `PollMerger` for
1041    /// the duration of this call. A concurrent `add_shard` /
1042    /// `remove_shard_internal` that `.store()`s a fresh merger
1043    /// only affects **subsequent** polls — this poll continues
1044    /// against the loaded snapshot's shard list.
1045    ///
1046    /// The implications:
1047    ///   - **add_shard mid-poll:** events ingested into the new
1048    ///     shard between the merger swap and our return are
1049    ///     invisible to this call. They appear on the next poll.
1050    ///   - **remove_shard_internal mid-poll:** the stale merger
1051    ///     still has the removed shard in its id list. Adapters
1052    ///     that lazy-create streams on `poll_shard` (JetStream
1053    ///     in particular) may recreate the stream and return
1054    ///     empty/stale data. The drained events are dispatched
1055    ///     to durable storage by `remove_shard_internal` itself
1056    ///     before this poll's adapter call lands; the next poll
1057    ///     loads the new merger and sees the correct shard set.
1058    ///
1059    /// In both cases the loss is transient and self-healing:
1060    /// pagination via `next_id` and the next poll's cursor pick
1061    /// up where we left off. Callers requiring strict
1062    /// "topology-stable" semantics should serialize their polls
1063    /// against scaling operations externally.
1064    pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
1065        let merger = self.poll_merger.load();
1066        merger.poll(request).await
1067    }
1068
1069    /// Get the number of shards.
1070    pub fn num_shards(&self) -> u16 {
1071        self.shard_manager.num_shards()
1072    }
1073
1074    /// Get the adapter name.
1075    pub fn adapter_name(&self) -> &'static str {
1076        self.adapter.name()
1077    }
1078
1079    /// Check if the adapter is healthy.
1080    pub async fn is_healthy(&self) -> bool {
1081        self.adapter.is_healthy().await
1082    }
1083
1084    /// Get statistics.
1085    pub fn stats(&self) -> &EventBusStats {
1086        &self.stats
1087    }
1088
1089    /// Get shard statistics.
1090    pub fn shard_stats(&self) -> crate::shard::ShardStats {
1091        self.shard_manager.stats()
1092    }
1093
1094    /// Sum of `len()` across every shard's ring buffer.
1095    ///
1096    /// Mainly useful in tests and operational diagnostics: a
1097    /// non-zero value at the time of `Drop` (without an awaited
1098    /// `shutdown()`) would be silently lost, so `Drop` folds this
1099    /// into `events_dropped` before the bus disappears.
1100    pub fn pending_in_rings(&self) -> u64 {
1101        self.shard_manager.total_pending_in_rings()
1102    }
1103
1104    /// Flush all pending batches.
1105    ///
1106    /// Waits for all shard ring buffers to drain, then for the
1107    /// per-shard mpsc channels to drain, then for any pending batch
1108    /// inside each batch worker to time out and dispatch — and only
1109    /// then calls `adapter.flush()`.
1110    ///
1111    /// # Latency bound
1112    ///
1113    /// The total wall-clock budget is the sum of three phases:
1114    ///   * Phase 1 (ring-buffer drain): up to **5 s**.
1115    ///   * Phase 2 (channel + pending-batch drain): up to
1116    ///     `min(2 s, batch.max_delay × n_workers)` — capped at 2 s
1117    ///     so a misconfigured `max_delay` cannot inflate the budget.
1118    ///   * Phase 3 (`adapter.flush()` call): up to `adapter_timeout`
1119    ///     (default **30 s**).
1120    ///
1121    /// Worst-case `flush()` runtime is therefore **~37 s under
1122    /// default config**, NOT 5 s as an earlier doc-comment stated.
1123    /// Callers wiring `flush()` into request-path latencies (HTTP
1124    /// handler, RPC) MUST set `adapter_timeout` accordingly or run
1125    /// the flush under their own outer timeout. The 5-second figure
1126    /// describes Phase 1 only; the doc was misleading and is fixed
1127    /// here.
1128    ///
1129    /// The previous implementation slept a single `batch.max_delay`
1130    /// (default 10 ms) after the ring buffers drained and immediately
1131    /// called `adapter.flush()`. Events still in transit through the
1132    /// per-shard mpsc channel, the batch worker's pending batch, or
1133    /// the in-progress `adapter.on_batch` call (bounded only by
1134    /// `adapter_timeout`, default 30 s) could miss the flush. Callers
1135    /// using `flush()` as a delivery barrier silently lost events.
1136    pub async fn flush(&self) -> Result<(), AdapterError> {
1137        let start = tokio::time::Instant::now();
1138        let timeout = Duration::from_secs(5);
1139        let mut backoff = Duration::from_micros(100);
1140
1141        // Phase 1: wait for ring buffers to drain (drain workers
1142        // pump them into the per-shard mpsc channels).
1143        loop {
1144            if self.shard_manager.all_shards_empty() {
1145                break;
1146            }
1147            if start.elapsed() >= timeout {
1148                tracing::warn!("flush: ring buffers not fully drained after {:?}", timeout);
1149                break;
1150            }
1151            tokio::time::sleep(backoff).await;
1152            backoff = (backoff * 2).min(Duration::from_millis(10));
1153        }
1154
1155        // Phase 2: wait until every event ingested before flush()
1156        // started has been handed to the adapter via `on_batch`.
1157        // Snapshot the `events_ingested` counter at flush entry —
1158        // that's our target. We then poll `events_dispatched` (sum
1159        // of batch lengths from successful adapter dispatches) plus
1160        // `events_dropped` (events the adapter rejected after retry
1161        // exhaustion or that never made it past backpressure). The
1162        // barrier is met when `events_dispatched + events_dropped >=
1163        // target`: every pre-flush ingest is accounted for in one
1164        // bucket or the other.
1165        //
1166        // This is a true delivery barrier — it doesn't rely on
1167        // "no progress this window" heuristics that race a
1168        // BatchWorker whose `batch_start` was set just before
1169        // flush() ran. A progress gate that reads
1170        // `batches_dispatched` only works if that counter is
1171        // actually incremented on every dispatch, and Windows
1172        // timer resolution alone has historically made any
1173        // single-`max_delay`-sleep approach a frequent flake.
1174        let target_ingested = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1175        let dropped_at_start = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1176        let dispatched_at_start = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1177
1178        // Outer deadline still bounds Phase 2 in case a wedged
1179        // adapter never returns. `max_delay * num_workers` is the
1180        // worst-case shape (one partially-filled batch per worker,
1181        // each waiting its full `max_delay` to time out), capped at
1182        // 2 s — same upper bound as before.
1183        //
1184        // Read the worker count via the shard manager's atomic-
1185        // backed `num_shards()` rather than `batch_workers.lock()
1186        // .len()`. The previous spinlock-backed `.lock()` inside
1187        // an `async fn` could stall the runtime worker thread
1188        // under contention with concurrent `add_shard_internal` /
1189        // `remove_shard_internal` callers; the atomic accessor is
1190        // both faster and async-safe. Mismatch in the
1191        // worker-count vs shard-count snapshot only changes the
1192        // phase2 deadline by at most one `max_delay` step, which
1193        // is bounded by the outer 2s cap regardless.
1194        let n_workers = usize::from(self.shard_manager.num_shards());
1195        let phase2_budget = self
1196            .config
1197            .batch
1198            .max_delay
1199            .saturating_mul(n_workers.max(1) as u32);
1200        let phase2_deadline =
1201            tokio::time::Instant::now() + phase2_budget.min(Duration::from_secs(2));
1202
1203        // Inner poll cadence: re-check the counters every 1 ms (or
1204        // `max_delay / 16`, whichever is larger). The fast cadence
1205        // means we exit promptly once the BatchWorker dispatches,
1206        // rather than waking exactly once per `max_delay` and
1207        // potentially racing the dispatch by a few ms.
1208        let poll_interval = (self.config.batch.max_delay / 16).max(Duration::from_millis(1));
1209        loop {
1210            let dispatched = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1211            let dropped = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1212            // The barrier: every event ingested pre-flush has been
1213            // either dispatched or dropped. The bus's invariant
1214            // `events_ingested = events_dispatched + events_dropped`
1215            // holds at quiescence; we wait until
1216            // `dispatched + dropped >= target_ingested`.
1217            //
1218            // Pre-fix this used `dispatched + (dropped -
1219            // dropped_at_start) >= target_ingested`, which under-
1220            // counted by `dropped_at_start`: even after every
1221            // pre-flush event was processed, the inequality
1222            // required `dropped_at_start` MORE post-flush events
1223            // before signalling done. Workloads with no post-flush
1224            // ingest hung at the barrier until the deadline fired.
1225            //
1226            // Cross-shard race remains: a fast shard's post-flush
1227            // dispatches can satisfy the global target while a
1228            // slow shard's pre-flush events linger in its mpsc
1229            // channel or pending batch. `all_shards_empty()`
1230            // checks ring buffers but not those downstream
1231            // queues. Operators relying on flush as a hard
1232            // delivery barrier should call it during quiet
1233            // ingest, or in `shutdown` (which gates ingest via
1234            // `try_enter_ingest`).
1235            let _ = dispatched_at_start; // reserved for future per-shard accounting
1236            if dispatched.saturating_add(dropped) >= target_ingested
1237                && self.shard_manager.all_shards_empty()
1238            {
1239                break;
1240            }
1241            if tokio::time::Instant::now() >= phase2_deadline {
1242                tracing::warn!(
1243                    target = target_ingested,
1244                    dispatched,
1245                    dropped = dropped.saturating_sub(dropped_at_start),
1246                    "flush: Phase 2 deadline reached before all ingested events were dispatched",
1247                );
1248                break;
1249            }
1250            tokio::time::sleep(poll_interval).await;
1251        }
1252
1253        // Phase 3: tell the adapter to flush whatever it has
1254        // buffered. Bounded by `adapter_timeout` so a hanging
1255        // adapter can't pin us forever.
1256        match tokio::time::timeout(self.config.adapter_timeout, self.adapter.flush()).await {
1257            Ok(r) => r,
1258            Err(_) => {
1259                tracing::warn!(
1260                    "flush: adapter.flush timed out after {:?}",
1261                    self.config.adapter_timeout
1262                );
1263                Err(AdapterError::Fatal("adapter flush timed out".into()))
1264            }
1265        }
1266    }
1267
1268    /// Gracefully shut down the event bus.
1269    ///
1270    /// The shutdown order is load-bearing:
1271    ///
1272    ///   1. Signal `shutdown` so drain workers stop pulling from
1273    ///      ring buffers after their final sweep.
1274    ///   2. Await **drain workers** so every event the producer
1275    ///      has handed to the bus is now in the per-shard mpsc
1276    ///      channel.
1277    ///   3. Drop `batch_senders` so each channel's last sender is
1278    ///      gone — the next `recv().await` in a batch worker will
1279    ///      return `None`.
1280    ///   4. Await **batch workers**, which drain everything
1281    ///      remaining in their channel and exit on `recv() = None`.
1282    ///
1283    /// Reversing steps 2 and 4 (the previous design) silently
1284    /// dropped events: a batch worker that exited on the shutdown
1285    /// flag could leave events the drain worker pushed *after* its
1286    /// `try_recv` sweep stranded in the channel.
1287    pub async fn shutdown(self) -> Result<(), AdapterError> {
1288        self.shutdown_via_ref().await
1289    }
1290
1291    /// Shutdown via shared reference — same semantics as
1292    /// [`shutdown`](Self::shutdown), but does not consume `self`.
1293    ///
1294    /// Useful for callers that hold the bus behind `Arc<EventBus>`
1295    /// (e.g., the SDK, where `subscribe` perpetuates an Arc clone
1296    /// into every `EventStream`) and therefore cannot satisfy
1297    /// `Arc::try_unwrap`. Idempotent: the first caller does the
1298    /// work; concurrent or subsequent callers wait for the
1299    /// `shutdown_completed` flag and return `Ok(())`.
1300    pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError> {
1301        // 1. CAS the shutdown flag false→true. SeqCst pairs with
1302        // `try_enter_ingest`'s shutdown check — any producer that
1303        // observed the previous `false` and is mid-push has its
1304        // `in_flight_ingests` increment ordered before this store
1305        // (the CAS-success branch is a release of the new `true`),
1306        // and so will be visible to the wait below.
1307        //
1308        // If the CAS loses (someone else — typically a concurrent
1309        // call or `Drop` — already flipped the flag), spin until
1310        // they finish. We can't run the rest of the body because
1311        // workers/senders may already be partially torn down.
1312        if self
1313            .shutdown
1314            .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
1315            .is_err()
1316        {
1317            // Bound the wait so a `Drop`-only path (which sets
1318            // `shutdown=true` but never sets `shutdown_completed`)
1319            // doesn't spin forever.
1320            //
1321            // Distinguish the two outcomes for callers. If
1322            // `shutdown_completed` flips inside the window, we
1323            // return `Ok(())` and the caller can be sure the first
1324            // caller finished. If the deadline fires first, we
1325            // surface `AdapterError::Transient(_)` — the bus IS
1326            // being shut down (the flag is set), but completion is
1327            // not yet observable; the caller can treat this as
1328            // "another thread is working on it, retry the
1329            // is_shutdown_completed() poll if you need a hard
1330            // barrier."
1331            //
1332            // Returning `Ok(())` in both branches would let
1333            // shutdown-done assumptions silently drift under a slow
1334            // adapter (`adapter_timeout` default 30 s > the 10 s
1335            // spin deadline), letting subsequent code observe a
1336            // partially-shut-down bus.
1337            // 10s in production builds; overridable via the
1338            // `_TEST_OVERRIDE_SHUTDOWN_VIA_REF_DEADLINE` thread-local
1339            // in test builds so the slow-first-caller test doesn't
1340            // need to wall-clock-wait for the full deadline. The
1341            // override is `#[cfg(test)]`-only; production cargo
1342            // builds compile out the override entirely.
1343            let deadline_dur = shutdown_via_ref_spin_deadline();
1344            // Use `tokio::time::Instant` so tests using
1345            // `tokio::time::pause()` virtualize this clock too.
1346            // Pre-fix `std::time::Instant` was wall-clock and
1347            // ignored `pause()`, breaking timeout-bounded tests
1348            // that wanted to fast-forward the spin deadline.
1349            let deadline = tokio::time::Instant::now() + deadline_dur;
1350            while !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1351                if tokio::time::Instant::now() >= deadline {
1352                    return Err(AdapterError::Transient(
1353                        "shutdown_via_ref: another caller is mid-shutdown; \
1354                         deadline elapsed before shutdown_completed \
1355                         flipped. The bus IS shutting down; poll \
1356                         is_shutdown_completed() if you need a hard \
1357                         barrier."
1358                            .into(),
1359                    ));
1360                }
1361                // `yield_now` re-queues immediately and keeps the
1362                // task hot, starving the workers we're waiting on
1363                // under contention. A short `sleep` parks the task
1364                // and lets the runtime schedule the workers.
1365                tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1366            }
1367            return Ok(());
1368        }
1369
1370        // 1a. Wait for in-flight ingests to drain BEFORE the drain
1371        // workers do their final ring-buffer sweep. Otherwise a
1372        // producer that observed `shutdown=false` could push *after*
1373        // the drain worker's last `pop_batch_into` returned zero,
1374        // leaving the event stranded in the ring buffer when the bus
1375        // is dropped.
1376        //
1377        // This is bounded: every producer either bails on the
1378        // SeqCst-synchronized shutdown check (no progress past the
1379        // increment) or completes its single non-blocking push and
1380        // decrements. Both paths take constant time; the total
1381        // wait is O(producer threads).
1382        //
1383        // The "every observed in-flight ingest completes before
1384        // the final sweep" property holds under normal conditions,
1385        // but the 5-second deadline below forces the gate open
1386        // even when producers are still in their push window. A
1387        // producer that has incremented `in_flight_ingests` (and
1388        // so observed `shutdown=false`) but whose push is delayed
1389        // past the deadline (heavy contention, debugger hit, etc.)
1390        // will complete its push AFTER the final sweep — its event
1391        // lands in the ring buffer and is never read. The deadline
1392        // exists so a stuck producer can't deadlock shutdown
1393        // indefinitely; the trade-off is documented data loss past
1394        // the 5 s window, surfaced via the `events_dropped` stat
1395        // (so the loss is observable to operators) and the `WARN`
1396        // log below (so it's diagnosable). The "no stranding"
1397        // promise on the happy path stands; the deadline path is
1398        // the documented escape hatch.
1399        // Use `tokio::time::Instant` so tests using
1400        // `tokio::time::pause()` virtualize this 5-second
1401        // deadline too — pre-fix the `std::time::Instant`
1402        // was wall-clock and ignored the test's paused clock.
1403        let in_flight_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1404        // Snapshot of `(stranded, ingested, dispatched)` at the
1405        // deadline — Some(...) iff we hit the deadline path. The
1406        // post-drain reconciliation reads this to compute the
1407        // actual drop count (see comment further down).
1408        let mut deadline_snapshot: Option<(u64, u64, u64)> = None;
1409        while self.in_flight_ingests.load(AtomicOrdering::SeqCst) > 0 {
1410            if tokio::time::Instant::now() >= in_flight_deadline {
1411                let stranded = self.in_flight_ingests.load(AtomicOrdering::SeqCst);
1412                let ingested_now = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1413                let dispatched_now = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1414                tracing::warn!(
1415                    in_flight = stranded,
1416                    lossy = true,
1417                    "shutdown timed out waiting for in-flight ingests after 5s; \
1418                     proceeding — up to {} events may strand in the ring buffer \
1419                     past final drain (documented data-loss path)",
1420                    stranded,
1421                );
1422                // Set the lossy flag immediately so a fast `is_*`
1423                // poll observes the outcome before the drain
1424                // finishes. The actual `events_dropped` bump is
1425                // deferred until after the final drain runs (see
1426                // "post-drain reconciliation" below) so we don't
1427                // double-count events that the drain still
1428                // successfully delivers — pre-fix this bumped
1429                // `events_dropped += stranded` here and the same
1430                // events that the final sweep then drained landed
1431                // in BOTH `events_ingested` and `events_dropped`,
1432                // breaking the bus's
1433                // `ingested == dispatched + dropped` invariant
1434                // and turning `shutdown_was_lossy` into a false
1435                // positive on every deadline-triggered shutdown.
1436                self.stats
1437                    .shutdown_was_lossy
1438                    .store(true, AtomicOrdering::Release);
1439                deadline_snapshot = Some((stranded, ingested_now, dispatched_now));
1440                break;
1441            }
1442            // Park instead of `yield_now`. The producers we're
1443            // waiting on contend for the same runtime threads;
1444            // re-queuing immediately starves their progress.
1445            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1446        }
1447
1448        // 1b. Release the drain-finalize gate.
1449        //
1450        // Pre-fix used `Ordering::Release` for this store
1451        // and relied on the SeqCst spin above (loading
1452        // `in_flight_ingests`) to provide the happens-before for
1453        // every observed-pre-shutdown push. That works today
1454        // because SeqCst loads carry an implicit fence, but a
1455        // future change to the spin's ordering (Relaxed for perf,
1456        // say) would silently break the drain worker's final-sweep
1457        // contract — producer pushes might not be visible to
1458        // `pop_batch_into`. Promote to SeqCst so the load-bearing
1459        // happens-before is explicit at this site, not derived
1460        // from another atomic's ordering choice.
1461        //
1462        // Caveat: the SeqCst happens-before above only covers the
1463        // *non-deadline* exit from the spin (the loop condition
1464        // observed `in_flight_ingests == 0`). On the deadline-break
1465        // path the loop exits with `in_flight_ingests > 0` — the
1466        // outstanding producer pushes have NOT been observed
1467        // synchronized-with this thread, and the SeqCst store below
1468        // does not retroactively create a happens-before with
1469        // pushes that are still mid-flight. Those events are
1470        // exactly the "stranded" events accounted via
1471        // `events_dropped` and `shutdown_was_lossy`; the contract
1472        // is "every observed-pre-shutdown push is visible to the
1473        // final sweep on the happy path; on the deadline path,
1474        // up to `stranded` events past the gate are surfaced as
1475        // dropped". The flag at line 1271 (`shutdown_was_lossy`)
1476        // is the operator-visible signal that this contract was
1477        // exercised on the lossy branch.
1478        self.drain_finalize_ready
1479            .store(true, AtomicOrdering::SeqCst);
1480
1481        // Stop the scaling monitor first — it's independent of the
1482        // ingestion path and just needs to observe the flag.
1483        let scaling_handle = self.scaling_monitor.lock().take();
1484        if let Some(handle) = scaling_handle {
1485            let _ = handle.await;
1486        }
1487
1488        // Take workers without holding the lock across await.
1489        let workers = std::mem::take(&mut *self.batch_workers.lock());
1490
1491        // 2. Await drain workers. Each one pops a final batch (up
1492        //    to 10k events) from its ring buffer, sends it on the
1493        //    channel, and exits. After this loop, every event in
1494        //    the ring buffers has been pushed to its channel.
1495        //
1496        // `join_all` lets the runtime overlap drain handles. A
1497        // sequential `for ... { drain.await; }` would serialize
1498        // shutdown wall-clock as N×T instead of max(T), which on
1499        // the default 1024-shard config × per-shard final-drain
1500        // time makes shutdown painful.
1501        let (drains, batch_handles): (Vec<_>, Vec<_>) = workers
1502            .into_iter()
1503            .map(|(shard_id, ShardWorkers { batch, drain, .. })| {
1504                ((shard_id, drain), (shard_id, batch))
1505            })
1506            .unzip();
1507
1508        // Surface drain-worker JoinErrors explicitly. The default
1509        // Tokio runtime does NOT log spawned-task panics, so a
1510        // `let _ = join_all(...)` would silently swallow a panic
1511        // and mask stranded events. tracing::error per failure
1512        // makes the incident grep-able post-mortem.
1513        let drain_handles: Vec<_> = drains.into_iter().map(|(_, h)| h).collect();
1514        let drain_ids: Vec<u16> = batch_handles.iter().map(|(id, _)| *id).collect();
1515        for (shard_id, result) in drain_ids
1516            .iter()
1517            .copied()
1518            .zip(futures::future::join_all(drain_handles).await)
1519        {
1520            if let Err(e) = result {
1521                tracing::error!(
1522                    shard_id,
1523                    error = %e,
1524                    "drain worker JoinHandle errored on shutdown await"
1525                );
1526            }
1527        }
1528
1529        // 3. Drop the original senders so the channels close once
1530        //    drain-worker sender clones (already dropped above)
1531        //    are gone too. Without this, batch workers would block
1532        //    on `recv().await` forever.
1533        drop(std::mem::take(&mut *self.batch_senders.write()));
1534
1535        // 4. Await batch workers. They drain their channel until
1536        //    `recv() = None`, flush, and exit.
1537        //
1538        // Same parallelization as the drain phase, with the same
1539        // explicit JoinError surfacing.
1540        let batch_only: Vec<_> = batch_handles.into_iter().map(|(_, h)| h).collect();
1541        for (shard_id, result) in drain_ids
1542            .into_iter()
1543            .zip(futures::future::join_all(batch_only).await)
1544        {
1545            if let Err(e) = result {
1546                tracing::error!(
1547                    shard_id,
1548                    error = %e,
1549                    "BatchWorker JoinHandle errored on shutdown await"
1550                );
1551            }
1552        }
1553
1554        // Flush and shutdown adapter (with timeout to prevent hanging)
1555        let timeout = self.config.adapter_timeout;
1556        if tokio::time::timeout(timeout, self.adapter.flush())
1557            .await
1558            .is_err()
1559        {
1560            tracing::error!("Adapter flush timed out during shutdown");
1561        }
1562        let result = tokio::time::timeout(timeout, self.adapter.shutdown())
1563            .await
1564            .map_err(|_| AdapterError::Fatal("adapter shutdown timed out".into()))?;
1565
1566        // Post-drain reconciliation for the lossy-shutdown path.
1567        //
1568        // If we hit the in-flight deadline above, `deadline_snapshot`
1569        // holds `(stranded, ingested@deadline, dispatched@deadline)`.
1570        // Some of those `stranded` producers' events landed in the
1571        // ring AFTER our deadline check but BEFORE the
1572        // `drain_finalize_ready` gate flipped — those events are
1573        // now successfully ingested, drained, and dispatched through
1574        // the adapter. They appear in `events_dispatched` (the
1575        // delta since the deadline), so:
1576        //
1577        //   actual_drops = stranded
1578        //                  - (dispatched_after_drain - dispatched@deadline)
1579        //                  - (ingested_after_drain - ingested@deadline)
1580        //                       only counting events that landed but
1581        //                       weren't dispatched (dropped under
1582        //                       backpressure, etc.)
1583        //
1584        // The cleaner reconciliation: events that completed
1585        // `try_enter_ingest` AFTER the deadline either completed
1586        // ingest (bumping `events_ingested`) or were dropped on
1587        // backpressure (bumping `events_dropped` from the existing
1588        // backpressure paths). The `stranded - delta_ingested`
1589        // remainder is producers whose `try_enter_ingest` succeeded
1590        // but never reached `shard_manager.ingest()` — those are
1591        // the genuinely-lost events we should account for.
1592        if let Some((stranded, ingested_at_deadline, _dispatched_at_deadline)) = deadline_snapshot {
1593            let ingested_after = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1594            let post_deadline_ingests = ingested_after.saturating_sub(ingested_at_deadline);
1595            // Known under-count window: a producer that completed
1596            // `shard_manager.ingest()` and bumped `events_ingested`
1597            // just BEFORE its `IngestGuard` decremented
1598            // `in_flight_ingests` will be counted in
1599            // `ingested_at_deadline` (already past the bump) AND in
1600            // `stranded` (still pending the decrement on that same
1601            // spin). That single event contributes `+1` to both
1602            // sides of the subtraction, so the saturating_sub
1603            // under-counts the drop by 1 per such interleaving.
1604            // The opposite direction (producer in-flight at
1605            // deadline that never completed ingest) is correctly
1606            // counted as a drop, so the bias is one-sided and
1607            // small (bounded by the number of producers mid-push
1608            // at the exact deadline moment — typically 0–1 even
1609            // under heavy load). Operators reading
1610            // `shutdown_was_lossy` get the right boolean; the
1611            // numeric `events_dropped` may under-count by a few
1612            // events on a lossy shutdown. Acceptable; the cost of
1613            // closing the window is paired-SeqCst reloads under
1614            // the spin which would dominate the shutdown path.
1615            let actual_drops = stranded.saturating_sub(post_deadline_ingests);
1616            if actual_drops > 0 {
1617                self.stats
1618                    .events_dropped
1619                    .fetch_add(actual_drops, AtomicOrdering::Relaxed);
1620            } else {
1621                // The deadline tripped the eager
1622                // `shutdown_was_lossy = true` set above, but the
1623                // final drain ingested every stranded event so
1624                // nothing was actually lost. Clear the flag so
1625                // operator dashboards alerting on
1626                // `was_lossy && events_dropped == 0` don't see a
1627                // false positive. Pre-fix the boolean stayed
1628                // `true` against `events_dropped == 0` for any
1629                // deadline-triggered shutdown whose drain
1630                // happened to catch up.
1631                self.stats
1632                    .shutdown_was_lossy
1633                    .store(false, AtomicOrdering::Release);
1634            }
1635            tracing::warn!(
1636                stranded_at_deadline = stranded,
1637                post_deadline_ingests,
1638                actual_drops,
1639                "lossy shutdown reconciled: post-drain `events_dropped` bumped \
1640                 by stranded - post-deadline-ingests (pre-fix this bumped by \
1641                 the full `stranded` count, double-counting events the drain \
1642                 still successfully delivered)",
1643            );
1644        }
1645
1646        // Mark shutdown as completed so Drop knows not to warn.
1647        self.shutdown_completed.store(true, AtomicOrdering::Release);
1648        result
1649    }
1650
1651    /// True once `shutdown` / `shutdown_via_ref` has signaled — does
1652    /// not imply the shutdown work has finished. Use
1653    /// [`is_shutdown_completed`](Self::is_shutdown_completed) for
1654    /// completion.
1655    pub fn is_shutdown(&self) -> bool {
1656        self.shutdown.load(AtomicOrdering::Acquire)
1657    }
1658
1659    /// True once `shutdown` / `shutdown_via_ref` has fully drained
1660    /// workers and the adapter shutdown returned (success path only).
1661    pub fn is_shutdown_completed(&self) -> bool {
1662        self.shutdown_completed.load(AtomicOrdering::Acquire)
1663    }
1664
1665    /// Get shard metrics (if dynamic scaling is enabled).
1666    pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>> {
1667        self.shard_manager.collect_metrics()
1668    }
1669
1670    /// Check if dynamic scaling is enabled.
1671    pub fn is_dynamic_scaling_enabled(&self) -> bool {
1672        self.config.scaling.is_some()
1673    }
1674
1675    /// Manually trigger a scale-up (for testing or manual intervention).
1676    ///
1677    /// Bypasses the auto-scaling cooldown so a deliberate operator
1678    /// request isn't rate-limited by the auto-scaling cadence.
1679    /// Pre-fix this looped `add_shard_internal()` N times, each
1680    /// of which bumped `last_scaling`, so iteration 1+ failed
1681    /// with `InCooldown` against any non-zero cooldown — the
1682    /// first shard was left half-added (workers spawned, routing
1683    /// entry installed) while the error propagated to the
1684    /// caller. The `max_shards` budget check still applies.
1685    pub async fn manual_scale_up(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1686        let mut new_ids = Vec::with_capacity(count as usize);
1687        for _ in 0..count {
1688            let id = self.add_shard_internal_force().await?;
1689            new_ids.push(id);
1690        }
1691        Ok(new_ids)
1692    }
1693
1694    /// Manually trigger a scale-down (for testing or manual intervention).
1695    ///
1696    /// Marks `count` shards as `Draining`, waits for them to empty,
1697    /// finalizes them to `Stopped`, and removes them from the
1698    /// routing table — mirroring the scaling monitor's per-tick
1699    /// finalize loop. Returns the IDs of shards that were
1700    /// successfully drained AND removed (subset of those marked
1701    /// Draining if any failed to empty within the deadline).
1702    ///
1703    /// Drives the full scale-down lifecycle synchronously: a
1704    /// plain `mapper.scale_down` call marks shards `Draining` but
1705    /// does NOT finalize them — finalization is the scaling
1706    /// monitor's responsibility. Bus configs without an active
1707    /// monitor (or callers that shut down before the monitor's
1708    /// next tick) would otherwise lose any events queued in those
1709    /// shards' ring buffers.
1710    pub async fn manual_scale_down(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1711        let mapper = self
1712            .shard_manager
1713            .mapper()
1714            .ok_or_else(|| AdapterError::Fatal("Dynamic scaling not enabled".into()))?;
1715
1716        let drained_ids = mapper
1717            .scale_down(count)
1718            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
1719
1720        // `finalize_draining` requires the shard to have been
1721        // Draining for >100ms with an empty ring buffer and no
1722        // pushes since drain start. Poll until every requested
1723        // shard finalizes, capped by an outer deadline so a wedged
1724        // producer can't pin this method forever. Use
1725        // `tokio::time::Instant` so tests under `tokio::time::pause()`
1726        // advance the virtual clock via `sleep` rather than spinning
1727        // until wall-clock catches up.
1728        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1729        let mut finalized: std::collections::HashSet<u16> = std::collections::HashSet::new();
1730        let target: std::collections::HashSet<u16> = drained_ids.iter().copied().collect();
1731
1732        while finalized.len() < target.len() && tokio::time::Instant::now() < deadline {
1733            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1734            let stopped = mapper.finalize_draining();
1735            // `finalize_draining` is destructive — every qualifying
1736            // Draining shard transitions to Stopped in one shot,
1737            // regardless of who initiated the drain. Pre-fix the
1738            // `if target.contains(&shard_id)` filter dropped non-
1739            // target ids on the floor; if the scaling monitor (or
1740            // a parallel `manual_scale_down` on a different target
1741            // set) finalized one of THEIR shards in the same tick,
1742            // that shard ended up Stopped with workers + routing
1743            // entry intact — leaked. Always tear down via
1744            // `remove_shard_internal` so the bus-side state
1745            // (workers, sender, routing) is freed; only count the
1746            // target subset toward the returned Vec.
1747            for shard_id in stopped {
1748                let _ = self.remove_shard_internal(shard_id).await;
1749                if target.contains(&shard_id) {
1750                    finalized.insert(shard_id);
1751                }
1752            }
1753        }
1754
1755        // Surface partial success at WARN level. The return shape
1756        // is preserved for compat (changing to `(Vec, Vec)` would
1757        // break the existing callers + test) so a smaller-than-
1758        // target list could otherwise be silently mistaken for full
1759        // success; the WARN log gives operations tooling something
1760        // to alert on.
1761        if finalized.len() < target.len() {
1762            let still_draining: Vec<u16> = target.difference(&finalized).copied().collect();
1763            tracing::warn!(
1764                requested = target.len(),
1765                finalized = finalized.len(),
1766                still_draining = ?still_draining,
1767                "manual_scale_down deadline elapsed before all targeted \
1768                 shards finalized — events still in-flight on the listed \
1769                 shards. They will finalize on the next scaling-monitor \
1770                 tick or on shutdown."
1771            );
1772        }
1773
1774        Ok(finalized.into_iter().collect())
1775    }
1776}
1777
1778impl Drop for EventBus {
1779    fn drop(&mut self) {
1780        // Signal shutdown so background tasks (drain workers, batch
1781        // workers, scaling monitor) observe the flag and exit. We
1782        // cannot await futures in Drop, but setting the atomic flag
1783        // triggers eventual termination.
1784        //
1785        // Previously used `Release` here while `try_enter_ingest`
1786        // and `shutdown()` use `SeqCst` on the same flag. `&mut self`
1787        // exclusion makes that sound today (no concurrent ingest can
1788        // observe a half-published shutdown). The mismatch is purely
1789        // defensive — switching to `SeqCst` matches the rest of the
1790        // lifecycle and removes a footgun if a future change ever
1791        // opened a path where `Drop` overlaps an in-flight
1792        // `try_enter_ingest`.
1793        self.shutdown.store(true, AtomicOrdering::SeqCst);
1794        // Also release the drain-finalize gate so any drain worker
1795        // already parked waiting for it can proceed and exit. Without
1796        // this, drop-without-shutdown leaves drain workers blocked on
1797        // `drain_finalize_ready` until their internal deadline fires
1798        // (which delays task cleanup by `DRAIN_FINALIZE_TIMEOUT`).
1799        // Best-effort durability: drop never gets the in-flight wait,
1800        // so any push that lands after this point is still lost.
1801        self.drain_finalize_ready
1802            .store(true, AtomicOrdering::SeqCst);
1803
1804        // Workers do NOT hold `Arc<EventBus>` — they hold
1805        // independent `Arc<ShardManager>` / `Arc<dyn Adapter>`
1806        // clones plus the channel halves. When `Drop` returns,
1807        // those Arcs survive in the still-running tasks and they
1808        // continue draining / dispatching until they observe the
1809        // shutdown flag we just set. There's no partial-Drop UB
1810        // risk: nothing on the worker side dereferences a
1811        // dropped EventBus field. The flags promote the
1812        // worker tasks from "blocked on recv / parked on
1813        // drain_finalize_ready" to "observe shutdown=true and
1814        // exit" so they don't linger indefinitely.
1815        //
1816        // If `shutdown()` was never awaited, any events still in the
1817        // per-shard ring buffers or mpsc channels are lost — the
1818        // adapter's `flush()` and `shutdown()` won't run, so durable
1819        // backends never see them. We can't fix that from `Drop` (no
1820        // async), but we *can* surface the data-loss risk loudly so
1821        // it doesn't hide. The check is bounded to "shutdown was
1822        // never started"; an in-progress shutdown is fine because the
1823        // call site is awaiting it.
1824        if !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1825            // Count events still sitting in shard ring buffers. They
1826            // are stranded — the drain workers will see `shutdown =
1827            // true` and exit without flushing, the adapter's
1828            // `flush()`/`shutdown()` never run, so anything in the
1829            // rings at this point is permanently lost. Surface that
1830            // loss via `events_dropped` so post-mortem stats reflect
1831            // reality (operators alerting on `events_dropped > 0`
1832            // would otherwise miss the entire incident), and set
1833            // `shutdown_was_lossy` so the boolean view is consistent
1834            // with the counter view.
1835            //
1836            // Events in the BatchWorker mpsc channels or pending
1837            // batches are not counted here — those workers may still
1838            // observe the shutdown flag and exit, but we have no
1839            // synchronous way from Drop to enumerate them. The ring-
1840            // buffer count is a lower bound on the stranded total.
1841            //
1842            // Use the non-blocking accessor: if `Drop` runs on a
1843            // thread that already holds a shard mutex (single-thread
1844            // runtime + panic during shutdown is the canonical
1845            // hazard), the blocking `total_pending_in_rings` would
1846            // self-deadlock. Best-effort accounting is the right
1847            // trade-off here — we'd rather under-report stranded
1848            // events than wedge the drop forever.
1849            let (stranded_in_rings, uncounted_shards) =
1850                self.shard_manager.try_total_pending_in_rings();
1851            if uncounted_shards > 0 {
1852                tracing::warn!(
1853                    uncounted_shards,
1854                    "EventBus::drop: {uncounted_shards} shard(s) were locked at \
1855                     drop time and could not be accounted for in stranded_in_rings"
1856                );
1857            }
1858            if stranded_in_rings > 0 {
1859                self.stats
1860                    .events_dropped
1861                    .fetch_add(stranded_in_rings, AtomicOrdering::Relaxed);
1862                self.stats
1863                    .shutdown_was_lossy
1864                    .store(true, AtomicOrdering::Release);
1865            }
1866
1867            let stats = self.shard_manager.stats();
1868            tracing::warn!(
1869                events_ingested = stats.events_ingested,
1870                events_dropped = stats.events_dropped,
1871                stranded_in_rings,
1872                "EventBus dropped without an awaited shutdown(). Any in-flight \
1873                 events still in the ring buffers or batch channels will be lost \
1874                 — the adapter's flush()/shutdown() never ran. Call \
1875                 `bus.shutdown().await` before dropping for durable shutdown."
1876            );
1877        }
1878    }
1879}
1880
1881/// Spin deadline for the second-caller path in
1882/// `shutdown_via_ref`. 10s in production.
1883#[cfg(not(test))]
1884fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1885    std::time::Duration::from_secs(10)
1886}
1887
1888/// Test-only override. Stored as milliseconds; `0` (the default)
1889/// means "use the production 10s". Set via
1890/// [`set_shutdown_via_ref_spin_deadline_for_test`] from inside a
1891/// test that needs to exercise the deadline-elapsed path without
1892/// wall-clock-waiting the full 10s.
1893///
1894/// This is a global atomic shared across all tests in the
1895/// `cargo test --lib` binary. If two tests touched it concurrently,
1896/// one's override would leak into the other's expectations. Tests
1897/// that use the override MUST take the
1898/// [`SHUTDOWN_DEADLINE_OVERRIDE_GUARD`] mutex for the duration of
1899/// their override-setter / read window — see
1900/// [`set_shutdown_via_ref_spin_deadline_for_test`].
1901#[cfg(test)]
1902static SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS: std::sync::atomic::AtomicU64 =
1903    std::sync::atomic::AtomicU64::new(0);
1904
1905/// Serializes access to the deadline override so concurrent tests
1906/// can't observe each other's transient values. Tests that override
1907/// the deadline take this mutex via
1908/// [`shutdown_deadline_override_lock`] and hold the guard until
1909/// they reset the override to 0.
1910///
1911/// Uses `tokio::sync::Mutex` rather than `std::sync::Mutex` so
1912/// the guard can legitimately be held across `.await` points
1913/// while the guarded test runs (clippy::await_holding_lock would
1914/// otherwise fire on the std variant — and rightly so for
1915/// production code).
1916#[cfg(test)]
1917static SHUTDOWN_DEADLINE_OVERRIDE_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
1918
1919/// Acquire the guard mutex protecting the deadline-override
1920/// static. Tests touching the override hold the returned guard
1921/// across both the set and the reset, so concurrent tests
1922/// observe a consistent default.
1923#[cfg(test)]
1924pub(crate) async fn shutdown_deadline_override_lock() -> tokio::sync::MutexGuard<'static, ()> {
1925    SHUTDOWN_DEADLINE_OVERRIDE_GUARD.lock().await
1926}
1927
1928#[cfg(test)]
1929fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1930    let ms = SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.load(std::sync::atomic::Ordering::Relaxed);
1931    if ms == 0 {
1932        std::time::Duration::from_secs(10)
1933    } else {
1934        std::time::Duration::from_millis(ms)
1935    }
1936}
1937
1938#[cfg(test)]
1939pub(crate) fn set_shutdown_via_ref_spin_deadline_for_test(ms: u64) {
1940    SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
1941}
1942
1943#[cfg(test)]
1944impl EventBus {
1945    /// Test-only: bump `in_flight_ingests` by `n` to simulate
1946    /// stranded producers mid-call to `shard_manager.ingest`.
1947    /// The shutdown spin loop reads `in_flight_ingests` to decide
1948    /// whether work is still pending; the lossy-shutdown
1949    /// reconciliation tests need to drive that counter without
1950    /// the matching real `try_enter_ingest` / `ingest_complete`
1951    /// machinery, which would also bump `events_ingested` and
1952    /// confuse the (stranded, post_deadline_ingests) accounting.
1953    ///
1954    /// Tests own the lifecycle: every `stage_stranded_ingest(n)`
1955    /// MUST be paired with `release_stranded_ingest(n)` before
1956    /// the bus drops, otherwise the `Drop` impl's invariants
1957    /// fire.
1958    pub(crate) fn stage_stranded_ingest(&self, n: u64) {
1959        self.in_flight_ingests
1960            .fetch_add(n, std::sync::atomic::Ordering::SeqCst);
1961    }
1962
1963    /// Test-only: counterpart to [`stage_stranded_ingest`].
1964    /// Decrements `in_flight_ingests` by `n` without bumping
1965    /// `events_ingested`. Use when simulating producers that
1966    /// were stranded by the shutdown deadline and never
1967    /// completed (the drain reconciliation should classify them
1968    /// as drops).
1969    pub(crate) fn release_stranded_ingest(&self, n: u64) {
1970        self.in_flight_ingests
1971            .fetch_sub(n, std::sync::atomic::Ordering::SeqCst);
1972    }
1973
1974    /// Test-only: simulate a previously-stranded producer
1975    /// completing AFTER the shutdown deadline. Bumps
1976    /// `events_ingested` first (Release ordering — the
1977    /// post-drain reconciliation reads it with Acquire), then
1978    /// releases the in-flight slot. The order matches what a
1979    /// real producer's `ingest` call site does, so the
1980    /// reconciliation's (ingested_after - ingested_at_deadline)
1981    /// arithmetic reads the same way.
1982    pub(crate) fn complete_stranded_ingest(&self, n: u64) {
1983        self.stats
1984            .events_ingested
1985            .fetch_add(n, std::sync::atomic::Ordering::Release);
1986        self.in_flight_ingests
1987            .fetch_sub(n, std::sync::atomic::Ordering::SeqCst);
1988    }
1989}
1990
1991async fn run_scaling_monitor_via_weak(weak: std::sync::Weak<EventBus>) {
1992    // Refresh `interval` from the policy on every tick. The previous
1993    // version cached it once at task start, so any future runtime
1994    // policy update would not be adopted by the monitor without a
1995    // process restart. Today `EventBusConfig` is immutable
1996    // post-construction so this is a no-op — but reading it each tick
1997    // is cheap (one atomic / RwLock read) and removes the latent
1998    // footgun.
1999    loop {
2000        let interval = match weak.upgrade() {
2001            Some(bus) => match &bus.config.scaling {
2002                Some(p) => p.metrics_window,
2003                None => return,
2004            },
2005            None => return,
2006        };
2007        tokio::time::sleep(interval).await;
2008
2009        let bus = match weak.upgrade() {
2010            Some(b) => b,
2011            // Last strong ref dropped — caller is shutting down (or
2012            // already gone). Exit cleanly.
2013            None => break,
2014        };
2015
2016        // SeqCst to match the writer side (`EventBus::shutdown` /
2017        // `Drop`). The Acquire/Release handshake on
2018        // `drain_finalize_ready` already provides the load-bearing
2019        // happens-before today — but a future code change that
2020        // piggybacks on `shutdown`'s ordering (e.g. a producer that
2021        // observes shutdown without going through
2022        // `try_enter_ingest`) would silently break under Relaxed.
2023        // Aligning the read-side ordering with the writer-side
2024        // SeqCst is a one-instruction tax for the safety.
2025        if bus.shutdown.load(AtomicOrdering::SeqCst) {
2026            break;
2027        }
2028
2029        // Collect metrics for observability.
2030        if let Some(metrics) = bus.shard_manager.collect_metrics() {
2031            for m in &metrics {
2032                if m.fill_ratio > 0.5 {
2033                    tracing::debug!(
2034                        shard_id = m.shard_id,
2035                        fill_ratio = m.fill_ratio,
2036                        event_rate = m.event_rate,
2037                        "Shard metrics"
2038                    );
2039                }
2040            }
2041        }
2042
2043        // Evaluate scaling.
2044        match bus.shard_manager.evaluate_scaling() {
2045            ScalingDecision::ScaleUp(count) => {
2046                tracing::info!(count = count, "Scaling up shards");
2047                for _ in 0..count {
2048                    if let Err(e) = bus.add_shard_internal().await {
2049                        tracing::error!(error = %e, "Failed to add shard");
2050                        break;
2051                    }
2052                }
2053            }
2054            ScalingDecision::ScaleDown(count) => {
2055                tracing::info!(count = count, "Scaling down shards");
2056                if let Some(mapper) = bus.shard_manager.mapper() {
2057                    if let Ok(drained) = mapper.scale_down(count) {
2058                        for shard_id in drained {
2059                            let _ = bus.shard_manager.drain_shard(shard_id);
2060                        }
2061                    }
2062                }
2063            }
2064            ScalingDecision::None => {}
2065        }
2066
2067        if let Some(mapper) = bus.shard_manager.mapper() {
2068            let stopped = mapper.finalize_draining();
2069            for shard_id in stopped {
2070                let _ = bus.remove_shard_internal(shard_id).await;
2071            }
2072        }
2073
2074        // CRITICAL: drop the strong ref BEFORE the next sleep so a
2075        // concurrent `shutdown(self)` caller can `Arc::try_unwrap`
2076        // the last strong ref while we're sleeping.
2077        drop(bus);
2078    }
2079}
2080
2081/// Spawn a batch worker for a shard.
2082/// Dispatch a batch to the adapter with timeout and optional retries.
2083/// Returns true if the batch was accepted, false if all attempts failed.
2084///
2085/// Non-retryable errors (e.g. `AdapterError::Connection`,
2086/// `AdapterError::Fatal`, `AdapterError::Serialization`) skip the
2087/// retry loop and drop the batch immediately. Retrying a fatal error
2088/// just delays the inevitable while burning CPU and amplifying log
2089/// noise. Use `AdapterError::is_retryable` as the single source of
2090/// truth for this decision.
2091/// Compute the per-attempt backoff for `dispatch_batch` retries.
2092///
2093/// Pre-fix the retry loop slept a flat `Duration::from_millis(100)`
2094/// after every failure. Under a partial backend outage (Redis /
2095/// JetStream slow but not dead), every shard's BatchWorker retried
2096/// on the exact same 100 ms cadence, producing a synchronized
2097/// retry storm that amplified load while the backend was
2098/// recovering.
2099///
2100/// Post-fix: exponential backoff (100, 200, 400, 800, 1600, 3200 ms)
2101/// with per-(shard, attempt) jitter to decorrelate retries across
2102/// shards. Capped at attempt=5 (3.2 s base) so retries don't grow
2103/// unboundedly. Jitter is `[-25%, +25%]` of the base, derived from
2104/// hashing `(shard_id, attempt)` — deterministic per shard but
2105/// uncorrelated across shards, which is exactly what the storm
2106/// mitigation needs.
2107fn retry_backoff(shard_id: u16, attempt: u32) -> Duration {
2108    use std::collections::hash_map::DefaultHasher;
2109    use std::hash::{Hash, Hasher};
2110
2111    // 100 ms × 2^attempt, capped at attempt=5 → 100/200/400/800/1600/3200.
2112    let base_ms: u64 = 100u64.saturating_mul(1u64 << attempt.min(5));
2113
2114    let mut hasher = DefaultHasher::new();
2115    shard_id.hash(&mut hasher);
2116    attempt.hash(&mut hasher);
2117    let h = hasher.finish();
2118    // Jitter in [0, base_ms/2), centered to give [-25%, +25%].
2119    let jitter_range = (base_ms / 2).max(1);
2120    let jitter = (h % jitter_range) as i64 - (jitter_range as i64 / 2);
2121    let final_ms = (base_ms as i64 + jitter).max(1) as u64;
2122
2123    Duration::from_millis(final_ms)
2124}
2125
2126async fn dispatch_batch(
2127    adapter: &dyn Adapter,
2128    batch: Arc<Batch>,
2129    shard_id: u16,
2130    timeout: Duration,
2131    retries: u32,
2132) -> bool {
2133    // Retry attempts clone the `Arc` (refcount bump only); the final
2134    // attempt moves it. Pre-fix the batch was `Batch` and every retry
2135    // deep-cloned the events `Vec` — at the common `retries == 0`
2136    // path this was a wasted N×Bytes refcount bump + Vec alloc on
2137    // every batch.
2138    for attempt in 0..retries {
2139        match tokio::time::timeout(timeout, adapter.on_batch(Arc::clone(&batch))).await {
2140            Ok(Ok(())) => return true,
2141            Ok(Err(e)) => {
2142                if !e.is_retryable() {
2143                    // Tag with a `reason` field so this
2144                    // distinct drop cause is separately filterable
2145                    // from retry-exhausted and timeout in
2146                    // observability tools. Shutdown is distinguished
2147                    // from generic non-retryable so an operator
2148                    // chasing "why are batches being dropped" can
2149                    // immediately tell a sequencing bug (sending to
2150                    // a stopped adapter) from a transport / config
2151                    // failure.
2152                    let reason = if e.is_shutdown() {
2153                        "adapter_shutdown"
2154                    } else {
2155                        "non_retryable"
2156                    };
2157                    tracing::error!(
2158                        shard_id,
2159                        error = %e,
2160                        attempt,
2161                        reason,
2162                        "Non-retryable error from adapter, dropping batch"
2163                    );
2164                    return false;
2165                }
2166                tracing::warn!(shard_id, error = %e, attempt, "Batch dispatch failed, retrying");
2167            }
2168            Err(_) => {
2169                tracing::warn!(shard_id, attempt, "Adapter on_batch timed out, retrying");
2170            }
2171        }
2172        tokio::time::sleep(retry_backoff(shard_id, attempt)).await;
2173    }
2174
2175    // Pre-fix the final attempt collapsed every drop into
2176    // one log line ("Failed to dispatch batch, dropping"), making
2177    // it impossible to tell retry-exhausted from fatal-non-
2178    // retryable from timeout in metrics. The non-retryable case
2179    // already has its own log inside the retry loop above (early
2180    // return); here we tag retry-exhausted vs timeout-after-
2181    // retries with distinct `reason` fields so log-based
2182    // observability tools can break the drops out by cause.
2183    match tokio::time::timeout(timeout, adapter.on_batch(batch)).await {
2184        Ok(Ok(())) => true,
2185        Ok(Err(e)) => {
2186            tracing::error!(
2187                shard_id,
2188                error = %e,
2189                reason = "retry_exhausted",
2190                attempts = retries + 1,
2191                "Failed to dispatch batch after exhausting retries, dropping"
2192            );
2193            false
2194        }
2195        Err(_) => {
2196            tracing::error!(
2197                shard_id,
2198                reason = "timeout",
2199                attempts = retries + 1,
2200                "Adapter on_batch timed out on final attempt, dropping batch"
2201            );
2202            false
2203        }
2204    }
2205}
2206
2207struct BatchWorkerParams {
2208    shard_id: u16,
2209    rx: mpsc::Receiver<Vec<crate::event::InternalEvent>>,
2210    adapter: Arc<dyn Adapter>,
2211    shard_manager: Arc<ShardManager>,
2212    config: BatchConfig,
2213    adapter_timeout: Duration,
2214    batch_retries: u32,
2215    /// Bus-owned mirror of `BatchWorker::next_sequence`. The worker
2216    /// stores its post-flush sequence here on every dispatch so the
2217    /// bus can read it after the worker exits — see
2218    /// `ShardWorkers::next_sequence` for the consumer side.
2219    next_sequence: Arc<AtomicU64>,
2220    /// Bus-level stats. The worker increments
2221    /// `batches_dispatched` and `events_dispatched` after every
2222    /// successful `dispatch_batch`. Both must actually be
2223    /// incremented here, otherwise `flush()`'s Phase 2 progress
2224    /// probe would always observe zero progress and early-break
2225    /// after a single `max_delay` window — racing the
2226    /// BatchWorker's first `recv_timeout` and flaking on
2227    /// Windows-class timer resolution.
2228    stats: Arc<EventBusStats>,
2229    /// Producer nonce stamped on every batch the worker emits.
2230    /// Bus-loaded from the persistent path when
2231    /// `producer_nonce_path` is configured, otherwise from the
2232    /// per-process default.
2233    producer_nonce: u64,
2234}
2235
2236fn spawn_batch_worker(params: BatchWorkerParams) -> JoinHandle<()> {
2237    let BatchWorkerParams {
2238        shard_id,
2239        mut rx,
2240        adapter,
2241        shard_manager,
2242        config,
2243        adapter_timeout,
2244        batch_retries,
2245        next_sequence,
2246        stats,
2247        producer_nonce,
2248    } = params;
2249    tokio::spawn(async move {
2250        let mut worker = BatchWorker::new(shard_id, config.clone(), next_sequence, producer_nonce);
2251
2252        loop {
2253            // Wait for events with timeout. The batch worker exits
2254            // only when its channel is closed — i.e. after every
2255            // upstream sender (the drain worker for this shard +
2256            // `EventBus::batch_senders`) has been dropped.
2257            // `EventBus::shutdown` enforces that ordering so no
2258            // event is left stranded in the channel.
2259            let recv_timeout = worker.time_until_timeout().unwrap_or(config.max_delay);
2260
2261            match tokio::time::timeout(recv_timeout, rx.recv()).await {
2262                Ok(Some(events)) => {
2263                    if let Some(batch) = worker.add_events(events) {
2264                        let batch_len = batch.len() as u64;
2265                        if dispatch_batch(
2266                            &*adapter,
2267                            Arc::new(batch),
2268                            shard_id,
2269                            adapter_timeout,
2270                            batch_retries,
2271                        )
2272                        .await
2273                        {
2274                            stats
2275                                .batches_dispatched
2276                                .fetch_add(1, AtomicOrdering::Relaxed);
2277                            stats
2278                                .events_dispatched
2279                                .fetch_add(batch_len, AtomicOrdering::Relaxed);
2280                            if let Some(shard_ref) = shard_manager.shard(shard_id) {
2281                                shard_ref.lock().record_batch_dispatch();
2282                            }
2283                        }
2284                    }
2285                }
2286                Ok(None) => {
2287                    // Channel closed — drain any pending and exit.
2288                    if worker.has_pending() {
2289                        let batch = worker.flush();
2290                        if !batch.is_empty() {
2291                            let batch_len = batch.len() as u64;
2292                            if dispatch_batch(
2293                                &*adapter,
2294                                Arc::new(batch),
2295                                shard_id,
2296                                adapter_timeout,
2297                                batch_retries,
2298                            )
2299                            .await
2300                            {
2301                                stats
2302                                    .batches_dispatched
2303                                    .fetch_add(1, AtomicOrdering::Relaxed);
2304                                stats
2305                                    .events_dispatched
2306                                    .fetch_add(batch_len, AtomicOrdering::Relaxed);
2307                            }
2308                        }
2309                    }
2310                    break;
2311                }
2312                Err(_) => {
2313                    // Timeout - check if we need to flush.
2314                    // Pre-fix [perf #38] called `worker.add_events(vec![])`
2315                    // as the signal, allocating an empty `Vec` per
2316                    // timeout tick. `check_timeout` is the direct
2317                    // expression of intent — no alloc.
2318                    if let Some(batch) = worker.check_timeout() {
2319                        let batch_len = batch.len() as u64;
2320                        if dispatch_batch(
2321                            &*adapter,
2322                            Arc::new(batch),
2323                            shard_id,
2324                            adapter_timeout,
2325                            batch_retries,
2326                        )
2327                        .await
2328                        {
2329                            stats
2330                                .batches_dispatched
2331                                .fetch_add(1, AtomicOrdering::Relaxed);
2332                            stats
2333                                .events_dispatched
2334                                .fetch_add(batch_len, AtomicOrdering::Relaxed);
2335                            if let Some(shard_ref) = shard_manager.shard(shard_id) {
2336                                shard_ref.lock().record_batch_dispatch();
2337                            }
2338                        }
2339                    }
2340                }
2341            }
2342        }
2343    })
2344}
2345
2346/// Maximum time a drain worker waits for `drain_finalize_ready`
2347/// after observing `shutdown=true`. Defense in depth: if a caller
2348/// drops the bus mid-shutdown without setting the gate, we don't
2349/// want the worker pinned forever. The shutdown path *always* sets
2350/// the gate (even on its own timeout), so this deadline is normally
2351/// unreached.
2352const DRAIN_FINALIZE_TIMEOUT: Duration = Duration::from_secs(10);
2353
2354/// Spawn a drain worker for a single shard.
2355///
2356/// Uses a scratch `Vec` + `pop_batch_into` so the per-cycle
2357/// allocation happens *outside* the shard mutex critical section.
2358/// Each cycle: lock → drain into scratch (no alloc, capacity already
2359/// reserved) → unlock → `mem::replace` swaps the filled scratch out
2360/// for a fresh empty `Vec` (alloc *outside* the lock) → send the
2361/// filled batch on the channel.
2362fn spawn_drain_worker_for_shard(
2363    shard_id: u16,
2364    shard_manager: Arc<ShardManager>,
2365    sender: mpsc::Sender<Vec<crate::event::InternalEvent>>,
2366    shutdown: Arc<AtomicBool>,
2367    drain_finalize_ready: Arc<AtomicBool>,
2368) -> JoinHandle<()> {
2369    const STEADY_BATCH: usize = 1_000;
2370    const FINAL_BATCH: usize = 10_000;
2371
2372    tokio::spawn(async move {
2373        let mut scratch: Vec<crate::event::InternalEvent> = Vec::with_capacity(STEADY_BATCH);
2374
2375        loop {
2376            // SeqCst to match the writer side (`EventBus::shutdown` /
2377            // `Drop`). `try_enter_ingest` itself uses SeqCst, and
2378            // the Acquire/Release handshake on
2379            // `drain_finalize_ready` (below) is what actually makes
2380            // the producer-push happen-before visible. Aligning to
2381            // SeqCst here makes the contract robust to future
2382            // producer-side changes that might piggyback on
2383            // `shutdown`'s ordering.
2384            if shutdown.load(AtomicOrdering::SeqCst) {
2385                // Before doing the final sweep, wait for `shutdown()`
2386                // to release the finalize gate. The gate is set only
2387                // after the in-flight ingest counter reaches zero,
2388                // which means every producer that read `shutdown=false`
2389                // has completed its push. Without this wait, the drain
2390                // worker can race ahead of a late push under
2391                // shard-mutex serialization (drain takes the lock
2392                // first, sees nothing, exits; producer then takes the
2393                // lock and pushes — event stranded).
2394                //
2395                // Acquire pairs with the Release in `EventBus::shutdown`
2396                // and `EventBus::drop`, transitively making every
2397                // producer push that happened-before its `in_flight`
2398                // decrement visible to the subsequent `pop_batch_into`.
2399                // `tokio::time::Instant` so virtualized-clock tests
2400                // (`tokio::time::pause`) advance the deadline via
2401                // `sleep` rather than spinning until wall-clock catches
2402                // up.
2403                let finalize_deadline = tokio::time::Instant::now() + DRAIN_FINALIZE_TIMEOUT;
2404                while !drain_finalize_ready.load(AtomicOrdering::Acquire) {
2405                    if tokio::time::Instant::now() >= finalize_deadline {
2406                        tracing::warn!(
2407                            shard_id,
2408                            "drain worker timed out waiting for finalize gate; \
2409                             proceeding with potential event loss"
2410                        );
2411                        break;
2412                    }
2413                    // Park instead of `yield_now` so we don't
2414                    // starve the workers / producers we're waiting
2415                    // on under contention.
2416                    tokio::time::sleep(std::time::Duration::from_millis(1)).await;
2417                }
2418
2419                // Final drain: loop until the ring buffer is empty.
2420                // A single 10k batch is not enough — the ring
2421                // buffer can hold up to `ring_buffer_capacity`
2422                // events (default 1M) and any leftover would be
2423                // silently lost on shutdown.
2424                //
2425                // Pre-fix this broke at the first
2426                // `popped == 0`. The audit posited a narrow race
2427                // where a producer that fetch_add'd
2428                // in_flight_ingests but stalled before the
2429                // shard-lock body could push AFTER shutdown
2430                // observed in_flight=0 yet BEFORE this final
2431                // sweep saw the event. The SeqCst guard pattern
2432                // makes this unlikely (the push happens-before
2433                // the guard drop), but the defense is cheap:
2434                // require TWO consecutive zero-event passes
2435                // before declaring drain. The yield_now between
2436                // them gives a stalled producer a chance to land
2437                // the push.
2438                let mut final_scratch: Vec<crate::event::InternalEvent> =
2439                    Vec::with_capacity(FINAL_BATCH);
2440                let mut consecutive_zeros = 0u32;
2441                loop {
2442                    let popped = shard_manager
2443                        .with_shard(shard_id, |shard| {
2444                            shard.pop_batch_into(&mut final_scratch, FINAL_BATCH)
2445                        })
2446                        .unwrap_or(0);
2447                    if popped == 0 {
2448                        consecutive_zeros += 1;
2449                        if consecutive_zeros >= 2 {
2450                            break;
2451                        }
2452                        // Yield to let any racing producer commit
2453                        // its push, then re-poll.
2454                        tokio::task::yield_now().await;
2455                        continue;
2456                    }
2457                    consecutive_zeros = 0;
2458                    let batch =
2459                        std::mem::replace(&mut final_scratch, Vec::with_capacity(FINAL_BATCH));
2460                    let batch_len = batch.len();
2461                    if let Err(_send_err) = sender.send(batch).await {
2462                        // Batch worker exited before drain. The
2463                        // `mem::replace` already pulled events out
2464                        // of the ring buffer, so the dropped batch
2465                        // is unrecoverable — the SendError carries
2466                        // it back but the consumer is gone. Surface
2467                        // the count loudly so the loss is
2468                        // observable in operator dashboards rather
2469                        // than a silent miss in shutdown stats.
2470                        tracing::error!(
2471                            shard_id,
2472                            dropped = batch_len,
2473                            "drain worker (final): batch worker dropped \
2474                             channel before final drain completed; \
2475                             events removed from ring buffer cannot be redelivered",
2476                        );
2477                        break;
2478                    }
2479                }
2480                break;
2481            }
2482
2483            // Drain events from ring buffer.
2484            let popped = shard_manager.with_shard(shard_id, |shard| {
2485                shard.pop_batch_into(&mut scratch, STEADY_BATCH)
2486            });
2487
2488            match popped {
2489                Some(0) => {
2490                    // No events — yield briefly. The 100μs sleep is deliberate:
2491                    // this is a latency-first system where the drain loop is the
2492                    // hot path. Longer backoff would add milliseconds of latency
2493                    // to the first event after a quiet period, violating the
2494                    // sub-microsecond design target. The CPU cost of 100μs polling
2495                    // is acceptable for a system that processes 10M+ events/sec.
2496                    tokio::time::sleep(Duration::from_micros(100)).await;
2497                }
2498                Some(_) => {
2499                    let batch = std::mem::replace(&mut scratch, Vec::with_capacity(STEADY_BATCH));
2500                    let batch_len = batch.len();
2501                    if let Err(_send_err) = sender.send(batch).await {
2502                        // Steady-state: the only way the batch
2503                        // worker drops the channel is if it
2504                        // panicked or `remove_shard_internal`
2505                        // tore it down out of order with the
2506                        // drain worker (which the documented
2507                        // shutdown sequence forbids). Either way,
2508                        // the events are unrecoverable — the
2509                        // `mem::replace` above already pulled them
2510                        // out of the ring buffer. Pre-fix this
2511                        // simply `break`-d, leaving the loss
2512                        // invisible. Surface a loud error with
2513                        // the dropped count so an out-of-order
2514                        // shutdown or batch-worker panic shows up
2515                        // in dashboards rather than as a silent
2516                        // metric gap.
2517                        tracing::error!(
2518                            shard_id,
2519                            dropped = batch_len,
2520                            "drain worker: batch worker dropped channel \
2521                             during steady-state drain; events removed from \
2522                             ring buffer cannot be redelivered",
2523                        );
2524                        break;
2525                    }
2526                }
2527                None => {
2528                    // Shard no longer exists (was removed)
2529                    break;
2530                }
2531            }
2532        }
2533    })
2534}
2535
2536#[cfg(test)]
2537mod tests {
2538    use super::*;
2539    use crate::shard::ScalingPolicy;
2540    use serde_json::json;
2541
2542    #[tokio::test]
2543    async fn test_event_bus_basic() {
2544        let config = EventBusConfig::builder()
2545            .num_shards(2)
2546            .ring_buffer_capacity(1024)
2547            .build()
2548            .unwrap();
2549
2550        let bus = EventBus::new(config).await.unwrap();
2551
2552        // Ingest some events
2553        for i in 0..10 {
2554            let event = Event::new(json!({"index": i}));
2555            bus.ingest(event).unwrap();
2556        }
2557
2558        // Give workers time to process
2559        tokio::time::sleep(Duration::from_millis(100)).await;
2560
2561        // Check stats
2562        assert_eq!(
2563            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2564            10
2565        );
2566
2567        bus.shutdown().await.unwrap();
2568    }
2569
2570    #[tokio::test]
2571    async fn test_event_bus_batch_ingest() {
2572        let config = EventBusConfig::default();
2573        let bus = EventBus::new(config).await.unwrap();
2574
2575        let events: Vec<Event> = (0..100).map(|i| Event::new(json!({"i": i}))).collect();
2576
2577        let ingested = bus.ingest_batch(events);
2578        assert_eq!(ingested, 100);
2579
2580        bus.shutdown().await.unwrap();
2581    }
2582
2583    #[tokio::test]
2584    async fn test_event_bus_with_dynamic_scaling() {
2585        let policy = ScalingPolicy {
2586            min_shards: 2,
2587            max_shards: 8,
2588            ..Default::default()
2589        };
2590
2591        let config = EventBusConfig::builder()
2592            .num_shards(2)
2593            .ring_buffer_capacity(1024)
2594            .scaling(policy)
2595            .build()
2596            .unwrap();
2597
2598        let bus = EventBus::new(config).await.unwrap();
2599
2600        // Verify dynamic scaling is enabled
2601        assert!(bus.is_dynamic_scaling_enabled());
2602        assert_eq!(bus.num_shards(), 2);
2603
2604        // Ingest some events
2605        for i in 0..100 {
2606            let event = Event::new(json!({"index": i}));
2607            bus.ingest(event).unwrap();
2608        }
2609
2610        // Give workers time to process
2611        tokio::time::sleep(Duration::from_millis(100)).await;
2612
2613        // Check stats
2614        assert_eq!(
2615            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2616            100
2617        );
2618
2619        bus.shutdown().await.unwrap();
2620    }
2621
2622    #[tokio::test]
2623    async fn test_manual_scale_up() {
2624        let policy = ScalingPolicy {
2625            min_shards: 2,
2626            max_shards: 8,
2627            cooldown: Duration::from_nanos(1), // Effectively disable cooldown for test
2628            ..Default::default()
2629        };
2630
2631        let config = EventBusConfig::builder()
2632            .num_shards(2)
2633            .ring_buffer_capacity(1024)
2634            .scaling(policy)
2635            .build()
2636            .unwrap();
2637
2638        let bus = EventBus::new(config).await.unwrap();
2639
2640        assert_eq!(bus.num_shards(), 2);
2641
2642        // Manually scale up
2643        let new_ids = bus.manual_scale_up(2).await.unwrap();
2644        assert_eq!(new_ids.len(), 2);
2645        assert_eq!(bus.num_shards(), 4);
2646
2647        // Ingest events - they should be distributed across all shards
2648        for i in 0..100 {
2649            let event = Event::new(json!({"index": i}));
2650            bus.ingest(event).unwrap();
2651        }
2652
2653        tokio::time::sleep(Duration::from_millis(100)).await;
2654
2655        assert_eq!(
2656            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2657            100
2658        );
2659
2660        bus.shutdown().await.unwrap();
2661    }
2662
2663    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #82: previously
2664    /// `manual_scale_down` only called `mapper.scale_down(count)`,
2665    /// which marks shards as `Draining` but does NOT finalize them.
2666    /// Bus configs without an active scaling monitor (or callers
2667    /// shutting down before the monitor's next tick) lost any
2668    /// events queued in the drained shards' ring buffers because
2669    /// `remove_shard_internal` was never invoked. The fix runs the
2670    /// full lifecycle synchronously: scale_down → poll for empty →
2671    /// finalize_draining → remove_shard_internal.
2672    ///
2673    /// We pin this by scaling up, manually scaling down, and
2674    /// asserting that `num_shards` actually decreases — pre-fix
2675    /// the count would still reflect the Draining shards.
2676    #[tokio::test]
2677    async fn manual_scale_down_finalizes_and_removes_drained_shards() {
2678        let policy = ScalingPolicy {
2679            min_shards: 2,
2680            max_shards: 8,
2681            cooldown: Duration::from_nanos(1),
2682            ..Default::default()
2683        };
2684        let config = EventBusConfig::builder()
2685            .num_shards(2)
2686            .ring_buffer_capacity(1024)
2687            .scaling(policy)
2688            .build()
2689            .unwrap();
2690        let bus = EventBus::new(config).await.unwrap();
2691
2692        // Scale up to 4, then back down to 2.
2693        let added = bus.manual_scale_up(2).await.unwrap();
2694        assert_eq!(added.len(), 2);
2695        assert_eq!(bus.num_shards(), 4);
2696
2697        let removed = bus.manual_scale_down(2).await.unwrap();
2698        assert_eq!(
2699            removed.len(),
2700            2,
2701            "manual_scale_down must complete the lifecycle for both \
2702             requested shards (mark Draining → wait → finalize → remove)"
2703        );
2704
2705        // Pre-fix: `num_shards` would still be 4 because shards
2706        // were only marked Draining (and the routing-table removal
2707        // path never ran). Post-fix: it's back to 2.
2708        assert_eq!(
2709            bus.num_shards(),
2710            2,
2711            "drained shards must be removed from the routing table"
2712        );
2713
2714        bus.shutdown().await.unwrap();
2715    }
2716
2717    #[tokio::test]
2718    async fn test_shard_metrics() {
2719        let policy = ScalingPolicy::default();
2720
2721        let config = EventBusConfig::builder()
2722            .num_shards(2)
2723            .ring_buffer_capacity(1024)
2724            .scaling(policy)
2725            .build()
2726            .unwrap();
2727
2728        let bus = EventBus::new(config).await.unwrap();
2729
2730        // Ingest some events
2731        for i in 0..50 {
2732            let event = Event::new(json!({"index": i}));
2733            bus.ingest(event).unwrap();
2734        }
2735
2736        // Get metrics
2737        let metrics = bus.shard_metrics();
2738        assert!(metrics.is_some());
2739        let metrics = metrics.unwrap();
2740        assert_eq!(metrics.len(), 2);
2741
2742        bus.shutdown().await.unwrap();
2743    }
2744
2745    #[tokio::test]
2746    async fn test_regression_eventbus_drop_signals_shutdown() {
2747        // Regression: dropping an EventBus without calling shutdown() used to
2748        // leave background tasks running indefinitely. The Drop impl now sets
2749        // the shutdown flag so workers eventually exit.
2750        let result = tokio::time::timeout(Duration::from_secs(5), async {
2751            let config = EventBusConfig::builder()
2752                .num_shards(2)
2753                .ring_buffer_capacity(1024)
2754                .build()
2755                .unwrap();
2756
2757            let bus = EventBus::new(config).await.unwrap();
2758
2759            // Ingest some events
2760            for i in 0..10 {
2761                let event = Event::new(json!({"index": i}));
2762                bus.ingest(event).unwrap();
2763            }
2764
2765            // Drop without calling shutdown()
2766            drop(bus);
2767
2768            // If we reach here, the drop didn't hang
2769        })
2770        .await;
2771
2772        assert!(
2773            result.is_ok(),
2774            "EventBus drop should not hang — Drop impl must signal shutdown"
2775        );
2776    }
2777
2778    #[tokio::test]
2779    async fn test_with_dynamic_scaling_builder() {
2780        let config = EventBusConfig::builder()
2781            .num_shards(4)
2782            .ring_buffer_capacity(2048)
2783            .with_dynamic_scaling()
2784            .build()
2785            .unwrap();
2786
2787        let bus = EventBus::new(config).await.unwrap();
2788
2789        assert!(bus.is_dynamic_scaling_enabled());
2790        assert_eq!(bus.num_shards(), 4);
2791
2792        bus.shutdown().await.unwrap();
2793    }
2794
2795    /// Mock adapter that counts `on_batch` invocations and returns a
2796    /// configurable error variant. Used to assert dispatch retry
2797    /// semantics without dragging in a real adapter.
2798    struct CountingErrAdapter {
2799        calls: Arc<std::sync::atomic::AtomicU32>,
2800        make_err: Box<dyn Fn() -> AdapterError + Send + Sync>,
2801    }
2802
2803    #[async_trait::async_trait]
2804    impl crate::adapter::Adapter for CountingErrAdapter {
2805        async fn init(&mut self) -> Result<(), AdapterError> {
2806            Ok(())
2807        }
2808        async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
2809            self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2810            Err((self.make_err)())
2811        }
2812        async fn flush(&self) -> Result<(), AdapterError> {
2813            Ok(())
2814        }
2815        async fn shutdown(&self) -> Result<(), AdapterError> {
2816            Ok(())
2817        }
2818        async fn poll_shard(
2819            &self,
2820            _shard_id: u16,
2821            _from_id: Option<&str>,
2822            _limit: usize,
2823        ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2824            Ok(crate::adapter::ShardPollResult::empty())
2825        }
2826        fn name(&self) -> &'static str {
2827            "counting_err"
2828        }
2829        async fn is_healthy(&self) -> bool {
2830            true
2831        }
2832    }
2833
2834    /// Regression: BUG_REPORT.md #21 — `dispatch_batch` previously
2835    /// retried every error variant, ignoring `AdapterError::is_retryable`.
2836    /// A non-retryable error (Connection / Fatal / Serialization)
2837    /// should now drop the batch immediately rather than burn the
2838    /// retry budget on something that cannot succeed.
2839    #[tokio::test(start_paused = true)]
2840    async fn dispatch_batch_skips_retries_on_non_retryable_error() {
2841        let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2842        let adapter = CountingErrAdapter {
2843            calls: calls.clone(),
2844            make_err: Box::new(|| AdapterError::Connection("refused".into())),
2845        };
2846
2847        let batch = Batch::new(0, vec![], 0);
2848        let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 5).await;
2849
2850        assert!(!ok, "non-retryable error must drop batch");
2851        assert_eq!(
2852            calls.load(std::sync::atomic::Ordering::SeqCst),
2853            1,
2854            "Connection error must not be retried; expected exactly 1 on_batch call"
2855        );
2856    }
2857
2858    /// Sanity: a retryable error *does* go through the full retry
2859    /// budget. Without this companion check, the previous test could
2860    /// pass for the wrong reason (e.g. if dispatch always returned on
2861    /// the first error).
2862    #[tokio::test(start_paused = true)]
2863    async fn dispatch_batch_retries_transient_errors() {
2864        let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2865        let adapter = CountingErrAdapter {
2866            calls: calls.clone(),
2867            make_err: Box::new(|| AdapterError::Transient("temp".into())),
2868        };
2869
2870        let batch = Batch::new(0, vec![], 0);
2871        let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 3).await;
2872
2873        assert!(!ok);
2874        // 3 retries + 1 final attempt = 4 total calls.
2875        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 4);
2876    }
2877
2878    /// Pin the perf contract behind the `Arc<Batch>` change.
2879    ///
2880    /// Pre-fix `dispatch_batch` deep-cloned the events `Vec` on every
2881    /// retry attempt (and the comment in production code already
2882    /// admitted this was wasted on the common `retries == 0` path).
2883    /// Post-fix the dispatcher takes `Arc<Batch>` and each retry is a
2884    /// refcount bump.
2885    ///
2886    /// We pin the contract observationally: an adapter that fails 3
2887    /// times then succeeds receives the *same* `Arc<Batch>` pointer
2888    /// on every call. A regression that ever rebuilt the Batch
2889    /// (e.g. a future refactor that does `Arc::new(batch.as_ref().clone())`)
2890    /// would surface here as distinct `Arc::as_ptr` values.
2891    #[tokio::test(start_paused = true)]
2892    async fn dispatch_batch_retries_share_the_same_arc_allocation() {
2893        // Identity-record `Arc::as_ptr(&batch)` per call via the
2894        // batch's strong-count snapshot. We cast to `usize` so the
2895        // adapter can stay `Send + Sync` without an unsafe impl —
2896        // the value is just an opaque identity stamp.
2897        struct PointerRecordingAdapter {
2898            seen: Arc<parking_lot::Mutex<Vec<usize>>>,
2899            fail_first_n: u32,
2900            calls: Arc<std::sync::atomic::AtomicU32>,
2901        }
2902
2903        #[async_trait::async_trait]
2904        impl crate::adapter::Adapter for PointerRecordingAdapter {
2905            async fn init(&mut self) -> Result<(), AdapterError> {
2906                Ok(())
2907            }
2908            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
2909                self.seen.lock().push(Arc::as_ptr(&batch) as usize);
2910                let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2911                if n < self.fail_first_n {
2912                    Err(AdapterError::Transient("retry me".into()))
2913                } else {
2914                    Ok(())
2915                }
2916            }
2917            async fn flush(&self) -> Result<(), AdapterError> {
2918                Ok(())
2919            }
2920            async fn shutdown(&self) -> Result<(), AdapterError> {
2921                Ok(())
2922            }
2923            async fn poll_shard(
2924                &self,
2925                _shard_id: u16,
2926                _from_id: Option<&str>,
2927                _limit: usize,
2928            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2929                Ok(crate::adapter::ShardPollResult::empty())
2930            }
2931            fn name(&self) -> &'static str {
2932                "pointer-recording"
2933            }
2934        }
2935
2936        let seen = Arc::new(parking_lot::Mutex::new(Vec::new()));
2937        let adapter = PointerRecordingAdapter {
2938            seen: Arc::clone(&seen),
2939            fail_first_n: 3,
2940            calls: Arc::new(std::sync::atomic::AtomicU32::new(0)),
2941        };
2942        let batch = Arc::new(Batch::new(0, vec![], 0));
2943        let original_ptr = Arc::as_ptr(&batch) as usize;
2944
2945        let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 5).await;
2946        assert!(ok, "adapter accepts on 4th try; dispatch must succeed");
2947
2948        let pointers = seen.lock().clone();
2949        assert_eq!(
2950            pointers.len(),
2951            4,
2952            "expected 3 failures + 1 success = 4 on_batch calls",
2953        );
2954        // Every call observed the SAME Arc target. A regression that
2955        // deep-clones would have produced four distinct pointers.
2956        for (i, p) in pointers.iter().enumerate() {
2957            assert_eq!(
2958                *p, original_ptr,
2959                "attempt {i} saw a different Arc<Batch>; dispatch_batch deep-cloned",
2960            );
2961        }
2962    }
2963
2964    /// Counting adapter that records the number of events delivered via
2965    /// `on_batch`. Used by shutdown-durability tests below.
2966    struct CountingAdapter {
2967        received: Arc<AtomicU64>,
2968    }
2969
2970    #[async_trait::async_trait]
2971    impl crate::adapter::Adapter for CountingAdapter {
2972        async fn init(&mut self) -> Result<(), AdapterError> {
2973            Ok(())
2974        }
2975        async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
2976            self.received
2977                .fetch_add(batch.events.len() as u64, AtomicOrdering::SeqCst);
2978            Ok(())
2979        }
2980        async fn flush(&self) -> Result<(), AdapterError> {
2981            Ok(())
2982        }
2983        async fn shutdown(&self) -> Result<(), AdapterError> {
2984            Ok(())
2985        }
2986        async fn poll_shard(
2987            &self,
2988            _shard_id: u16,
2989            _from_id: Option<&str>,
2990            _limit: usize,
2991        ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2992            Ok(crate::adapter::ShardPollResult::empty())
2993        }
2994        fn name(&self) -> &'static str {
2995            "counting"
2996        }
2997        async fn is_healthy(&self) -> bool {
2998            true
2999        }
3000    }
3001
3002    /// `retry_backoff` exponentially grows the base delay
3003    /// per attempt and adds per-(shard, attempt) jitter to
3004    /// decorrelate retries across shards. Pin both invariants:
3005    /// monotonic growth on the base, and jitter that produces
3006    /// different outputs for different shard ids.
3007    #[test]
3008    fn retry_backoff_grows_with_attempt_and_jitters_per_shard() {
3009        // Shard 0 attempt 0..6: base ms 100, 200, 400, 800, 1600,
3010        // 3200, 3200 (cap). Plus ±25% jitter.
3011        let s0_a0 = retry_backoff(0, 0).as_millis();
3012        let s0_a1 = retry_backoff(0, 1).as_millis();
3013        let s0_a4 = retry_backoff(0, 4).as_millis();
3014        let s0_a5 = retry_backoff(0, 5).as_millis();
3015        let s0_a6 = retry_backoff(0, 6).as_millis();
3016
3017        // Bounds: each attempt's base is in `[base*0.75, base*1.25)`.
3018        assert!((75..=125).contains(&s0_a0));
3019        assert!((150..=250).contains(&s0_a1));
3020        assert!((1200..=2000).contains(&s0_a4));
3021        assert!((2400..=4000).contains(&s0_a5));
3022        // Cap at attempt=5: attempt 6 must NOT exceed attempt 5's
3023        // upper bound.
3024        assert!(
3025            s0_a6 <= 4000,
3026            "attempt > 5 must cap at the attempt-5 base; got {}ms",
3027            s0_a6
3028        );
3029
3030        // Jitter property: different shards at the same
3031        // attempt land on different backoffs. Sample 16 distinct
3032        // shard ids and assert at least 4 unique backoff values.
3033        //
3034        // The bound is deliberately loose (4 / 16) because
3035        // `DefaultHasher`'s exact distribution is **not stable**
3036        // across Rust toolchain versions — a tighter check (e.g.
3037        // ≥ 8) would empirically pass on every toolchain we test
3038        // against today, but a future stdlib change to the hasher
3039        // could shift the distribution and flake CI for a property
3040        // (decorrelation across shards) that doesn't actually
3041        // depend on a high collision-resistance bar. Asserting
3042        // ≥ 4 unique values out of 16 is enough to catch a real
3043        // regression (e.g. accidentally hashing only `attempt`
3044        // and not `shard_id` would collapse all 16 to a single
3045        // value) while staying robust to hasher-distribution
3046        // drift.
3047        use std::collections::HashSet;
3048        let s_attempt2: HashSet<u128> = (0u16..16)
3049            .map(|s| retry_backoff(s, 2).as_millis())
3050            .collect();
3051        assert!(
3052            s_attempt2.len() >= 4,
3053            "jitter must decorrelate retries across shards; \
3054             only {} unique backoffs across 16 shards",
3055            s_attempt2.len()
3056        );
3057    }
3058
3059    /// CR-23: pin that `EventBus::shutdown` actually invokes the
3060    /// adapter's `flush()` and `shutdown()` methods. The existing
3061    /// `sdk/tests/shutdown_regression.rs` covers the
3062    /// "shutdown runs even with outstanding Arc clones" property
3063    /// using a memory adapter whose `flush`/`shutdown` are no-ops
3064    /// — so a regression that elided the adapter calls would still
3065    /// pass. This test uses a recording adapter that increments
3066    /// per-method counters; we assert flush AND shutdown both fired
3067    /// exactly once after a clean `bus.shutdown().await`.
3068    ///
3069    /// The fix routes `Net::shutdown` through
3070    /// `shutdown_via_ref`, which in turn calls
3071    /// `self.adapter.flush()` and `self.adapter.shutdown()` once
3072    /// each. CR-23 pins this contract at the bus layer so an
3073    /// inadvertent regression at the SDK or adapter wrapper layer
3074    /// can be caught without an integration setup.
3075    #[tokio::test]
3076    async fn cr23_shutdown_invokes_adapter_flush_and_shutdown_exactly_once() {
3077        struct RecordingAdapter {
3078            on_batch_calls: Arc<AtomicU64>,
3079            flush_calls: Arc<AtomicU64>,
3080            shutdown_calls: Arc<AtomicU64>,
3081        }
3082
3083        #[async_trait::async_trait]
3084        impl crate::adapter::Adapter for RecordingAdapter {
3085            async fn init(&mut self) -> Result<(), AdapterError> {
3086                Ok(())
3087            }
3088            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3089                self.on_batch_calls.fetch_add(1, AtomicOrdering::SeqCst);
3090                Ok(())
3091            }
3092            async fn flush(&self) -> Result<(), AdapterError> {
3093                self.flush_calls.fetch_add(1, AtomicOrdering::SeqCst);
3094                Ok(())
3095            }
3096            async fn shutdown(&self) -> Result<(), AdapterError> {
3097                self.shutdown_calls.fetch_add(1, AtomicOrdering::SeqCst);
3098                Ok(())
3099            }
3100            async fn poll_shard(
3101                &self,
3102                _shard_id: u16,
3103                _from_id: Option<&str>,
3104                _limit: usize,
3105            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3106                Ok(crate::adapter::ShardPollResult::empty())
3107            }
3108            fn name(&self) -> &'static str {
3109                "cr23-recording"
3110            }
3111            async fn is_healthy(&self) -> bool {
3112                true
3113            }
3114        }
3115
3116        let on_batch = Arc::new(AtomicU64::new(0));
3117        let flush = Arc::new(AtomicU64::new(0));
3118        let shutdown = Arc::new(AtomicU64::new(0));
3119        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(RecordingAdapter {
3120            on_batch_calls: on_batch.clone(),
3121            flush_calls: flush.clone(),
3122            shutdown_calls: shutdown.clone(),
3123        });
3124
3125        let config = EventBusConfig::builder()
3126            .num_shards(2)
3127            .ring_buffer_capacity(1024)
3128            .build()
3129            .unwrap();
3130        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3131
3132        // Drive a small burst so on_batch fires at least once —
3133        // pins that the adapter is wired up correctly. The
3134        // load-bearing assertions below are on flush and shutdown.
3135        for i in 0..16 {
3136            let _ = bus.ingest(Event::new(json!({"i": i})));
3137        }
3138
3139        // Pre-CR-23 a regression that elided one of these would
3140        // pass `shutdown_regression.rs::shutdown_runs_even_with_outstanding_event_stream`
3141        // because the memory adapter's flush/shutdown are no-ops.
3142        // Here the recording adapter makes the contract observable.
3143        bus.shutdown().await.unwrap();
3144
3145        assert!(
3146            on_batch.load(AtomicOrdering::SeqCst) > 0,
3147            "sanity: on_batch must have fired at least once"
3148        );
3149        assert_eq!(
3150            flush.load(AtomicOrdering::SeqCst),
3151            1,
3152            "CR-23 regression: shutdown MUST call adapter.flush() exactly once"
3153        );
3154        assert_eq!(
3155            shutdown.load(AtomicOrdering::SeqCst),
3156            1,
3157            "CR-23 regression: shutdown MUST call adapter.shutdown() exactly once"
3158        );
3159    }
3160
3161    /// CR-25: pin that a SECOND caller of `shutdown_via_ref` whose
3162    /// CAS loses (because a first caller already flipped the
3163    /// `shutdown` flag) and whose deadline elapses BEFORE the
3164    /// first caller sets `shutdown_completed=true` returns
3165    /// `AdapterError::Transient(_)` — NOT a silent `Ok(())`.
3166    ///
3167    /// Pre-CR-25 both branches returned `Ok`. A caller that lost
3168    /// the CAS race had no way to distinguish "first caller
3169    /// finished shutdown" from "deadline timed out mid-shutdown."
3170    /// Under a slow adapter (`adapter_timeout` default 30s >
3171    /// the 10s spin deadline), the second caller silently saw
3172    /// `Ok` while the bus was still mid-shutdown — letting
3173    /// subsequent code observe a partially-shut-down bus.
3174    ///
3175    /// We use a slow first caller (sleeps inside `flush()`)
3176    /// and override the spin deadline to a few ms so the test
3177    /// runs fast.
3178    #[tokio::test]
3179    async fn cr25_second_caller_returns_transient_when_deadline_elapses() {
3180        struct SlowFlushAdapter {
3181            // Block flush() for this long. The first caller
3182            // gets stuck here while the second caller's spin
3183            // deadline elapses.
3184            flush_delay: std::time::Duration,
3185        }
3186
3187        #[async_trait::async_trait]
3188        impl crate::adapter::Adapter for SlowFlushAdapter {
3189            async fn init(&mut self) -> Result<(), AdapterError> {
3190                Ok(())
3191            }
3192            async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3193                Ok(())
3194            }
3195            async fn flush(&self) -> Result<(), AdapterError> {
3196                tokio::time::sleep(self.flush_delay).await;
3197                Ok(())
3198            }
3199            async fn shutdown(&self) -> Result<(), AdapterError> {
3200                Ok(())
3201            }
3202            async fn poll_shard(
3203                &self,
3204                _shard_id: u16,
3205                _from_id: Option<&str>,
3206                _limit: usize,
3207            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3208                Ok(crate::adapter::ShardPollResult::empty())
3209            }
3210            fn name(&self) -> &'static str {
3211                "cr25-slow-flush"
3212            }
3213            async fn is_healthy(&self) -> bool {
3214                true
3215            }
3216        }
3217
3218        // Cubic P2: serialize access to the global deadline
3219        // override so concurrent tests don't interfere. Hold the
3220        // guard until the override is reset to 0 below.
3221        let _override_guard = super::shutdown_deadline_override_lock().await;
3222
3223        // Override the second-caller spin deadline to a short
3224        // value so the test doesn't wall-clock-wait 10s. Production
3225        // builds use the 10s default (the cfg(test) override is
3226        // compiled out).
3227        super::set_shutdown_via_ref_spin_deadline_for_test(50);
3228
3229        let config = EventBusConfig::builder()
3230            .num_shards(2)
3231            .ring_buffer_capacity(1024)
3232            .adapter_timeout(std::time::Duration::from_secs(5))
3233            .build()
3234            .unwrap();
3235        // First caller's flush() sleeps 500ms — far longer than
3236        // the 50ms spin deadline.
3237        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(SlowFlushAdapter {
3238            flush_delay: std::time::Duration::from_millis(500),
3239        });
3240        let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
3241
3242        // Spawn the FIRST caller — it wins the CAS and proceeds
3243        // into the slow flush. We don't await it; we want it
3244        // running in parallel.
3245        let bus_first = Arc::clone(&bus);
3246        let first_caller = tokio::spawn(async move { bus_first.shutdown_via_ref().await });
3247
3248        // Cubic P2: poll `is_shutdown()` until the first caller
3249        // has set the flag, instead of a fixed sleep. This makes
3250        // the test scheduler-independent — we proceed as soon as
3251        // the first caller has won the CAS, regardless of how
3252        // tokio happens to schedule things. Bounded by a 1s
3253        // timeout so a regression that prevents `shutdown_via_ref`
3254        // from setting the flag doesn't hang the test.
3255        tokio::time::timeout(std::time::Duration::from_secs(1), async {
3256            while !bus.is_shutdown() {
3257                tokio::task::yield_now().await;
3258            }
3259        })
3260        .await
3261        .expect("first caller did not set shutdown flag within 1s");
3262
3263        // The second caller's CAS will fail; it enters the spin
3264        // and times out at 50ms.
3265        let start = std::time::Instant::now();
3266        let second_result = bus.shutdown_via_ref().await;
3267        let elapsed = start.elapsed();
3268
3269        // Reset the override for other tests, then drop the guard.
3270        super::set_shutdown_via_ref_spin_deadline_for_test(0);
3271
3272        // CR-25 contract: the second caller MUST get a Transient
3273        // error, not a silent Ok.
3274        let err = second_result.expect_err(
3275            "CR-25 regression: second caller MUST surface AdapterError::Transient \
3276             when its deadline elapses, NOT a silent Ok",
3277        );
3278        match err {
3279            AdapterError::Transient(msg) => {
3280                assert!(
3281                    msg.contains("deadline elapsed") || msg.contains("mid-shutdown"),
3282                    "error message must reference the deadline path; got: {msg}"
3283                );
3284            }
3285            other => panic!("expected Transient, got {:?}", other),
3286        }
3287
3288        // Sanity: the second caller's elapsed time is bounded by
3289        // the override (50ms) + scheduler slop. If this is
3290        // anywhere near the production 10s, the cfg(test)
3291        // override path broke.
3292        assert!(
3293            elapsed < std::time::Duration::from_secs(5),
3294            "second caller took {elapsed:?}; the cfg(test) deadline override \
3295             broke if this is near the 10s production default"
3296        );
3297
3298        // Wait for the first caller to finish. Even though it
3299        // took the slow path, the bus IS shutting down and will
3300        // eventually complete.
3301        let _ = first_caller.await.unwrap();
3302        assert!(bus.is_shutdown_completed());
3303    }
3304
3305    /// Regression: BUG_REPORT.md #6 — `shutdown()` must deliver every
3306    /// successfully-ingested event to the adapter before returning.
3307    /// Pins the broader durability contract that the
3308    /// `drain_finalize_ready` gate supports: the drain worker may not
3309    /// finalize until the in-flight wait completes.
3310    ///
3311    /// Tests across many shards with bursts large enough that the
3312    /// drain workers are mid-loop when shutdown begins.
3313    #[tokio::test]
3314    async fn shutdown_delivers_every_successful_ingest_to_adapter() {
3315        let received = Arc::new(AtomicU64::new(0));
3316        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3317            received: received.clone(),
3318        });
3319
3320        let config = EventBusConfig::builder()
3321            .num_shards(4)
3322            .ring_buffer_capacity(4096)
3323            .build()
3324            .unwrap();
3325        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3326
3327        // Drive a sizable burst across all shards. Capacity > burst so
3328        // we don't trip backpressure; every successful Ok must reach
3329        // `on_batch` before shutdown returns.
3330        let total = 10_000usize;
3331        let mut successes = 0u64;
3332        for i in 0..total {
3333            if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3334                successes += 1;
3335            }
3336        }
3337
3338        // Shutdown awaits drain workers; with the BUG_REPORT.md #6 fix
3339        // those workers wait on `drain_finalize_ready` after observing
3340        // `shutdown=true`, so any push the producer made before the
3341        // shutdown flag is guaranteed to be in the ring buffer when
3342        // the final sweep runs.
3343        bus.shutdown().await.unwrap();
3344
3345        let delivered = received.load(AtomicOrdering::SeqCst);
3346        assert_eq!(
3347            delivered, successes,
3348            "shutdown stranded events: {successes} ingested successfully, \
3349             only {delivered} reached the adapter"
3350        );
3351    }
3352
3353    /// Regression: BUG_REPORT.md #16 — `flush()` must be a delivery
3354    /// barrier: after it returns successfully, every event the
3355    /// caller successfully ingested before `flush()` was called
3356    /// must have been handed to the adapter via `on_batch`.
3357    /// The previous implementation slept a single `batch.max_delay`
3358    /// after the ring buffers drained, which left a window where
3359    /// events could still be sitting in the per-shard mpsc channel
3360    /// or inside a partially-filled batch awaiting timeout — those
3361    /// events were silently dropped from the flush guarantee.
3362    #[tokio::test]
3363    async fn flush_is_a_delivery_barrier() {
3364        let received = Arc::new(AtomicU64::new(0));
3365        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3366            received: received.clone(),
3367        });
3368
3369        // Use a deliberately *long* batch.max_delay (250ms) so that a
3370        // partially-filled batch sitting in the batch worker's
3371        // pending state would survive past the old single-`max_delay`
3372        // sleep. min_size > burst forces the partial-batch path.
3373        let config = EventBusConfig::builder()
3374            .num_shards(2)
3375            .ring_buffer_capacity(1024)
3376            .batch(crate::config::BatchConfig {
3377                min_size: 1_000,
3378                max_size: 10_000,
3379                max_delay: Duration::from_millis(250),
3380                adaptive: false,
3381                velocity_window: Duration::from_millis(100),
3382            })
3383            .build()
3384            .unwrap();
3385        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3386
3387        // A small burst — far below `min_size`, so the batch worker
3388        // will sit on a partial batch waiting for `max_delay`.
3389        let burst = 50usize;
3390        let mut successes = 0u64;
3391        for i in 0..burst {
3392            if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3393                successes += 1;
3394            }
3395        }
3396
3397        // Time the flush call to confirm we waited long enough for
3398        // the partial batch to time out. The previous code slept
3399        // ~10ms total in the post-empty phase; the fix waits up to
3400        // `max_delay * num_workers` (here 500ms cap, capped at 2s).
3401        let t0 = std::time::Instant::now();
3402        bus.flush().await.unwrap();
3403        let elapsed = t0.elapsed();
3404
3405        // After flush returns, every successful ingest must have
3406        // been delivered to the adapter. With the old code this
3407        // assertion would fail: events sit in the partial batch
3408        // until `max_delay` (250ms) elapses, but flush returned
3409        // after only ~10ms.
3410        let delivered = received.load(AtomicOrdering::SeqCst);
3411        assert_eq!(
3412            delivered, successes,
3413            "flush() returned but only {delivered} of {successes} \
3414             events reached the adapter (#16); flush waited {:?}",
3415            elapsed
3416        );
3417
3418        bus.shutdown().await.unwrap();
3419    }
3420
3421    /// Regression (Phase 1): when configured with a
3422    /// persistent `producer_nonce_path`, two bus instances launched
3423    /// against the same path stamp the SAME nonce on every emitted
3424    /// batch. JetStream / Redis adapters key dedup on this nonce, so
3425    /// a producer that crashed mid-batch and restarted (within the
3426    /// backend's dedup window) issues retries with the same msg-ids
3427    /// and the backend correctly recognizes them as duplicates.
3428    ///
3429    /// Pre-fix the per-process nonce regenerated on every startup,
3430    /// so post-crash retries wrote NEW msg-ids and the backend
3431    /// persisted the partial-batch's accepted half twice.
3432    #[tokio::test]
3433    async fn persistent_producer_nonce_survives_bus_restart() {
3434        // Use a per-test temp file so concurrent runs don't collide.
3435        let mut nonce_path = std::env::temp_dir();
3436        let pid = std::process::id();
3437        let nanos = std::time::SystemTime::now()
3438            .duration_since(std::time::UNIX_EPOCH)
3439            .map(|d| d.as_nanos())
3440            .unwrap_or(0);
3441        nonce_path.push(format!("net-test-bus-nonce-{pid}-{nanos}"));
3442
3443        let make_config = |path: &std::path::Path| {
3444            EventBusConfig::builder()
3445                .num_shards(1)
3446                .ring_buffer_capacity(1024)
3447                .producer_nonce_path(path)
3448                .build()
3449                .unwrap()
3450        };
3451
3452        // First bus: ingest one event. Read its nonce off the
3453        // adapter-bound batch via a recording adapter.
3454        struct NonceRecordingAdapter {
3455            nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3456        }
3457        #[async_trait::async_trait]
3458        impl crate::adapter::Adapter for NonceRecordingAdapter {
3459            async fn init(&mut self) -> Result<(), AdapterError> {
3460                Ok(())
3461            }
3462            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3463                *self.nonce.lock() = Some(batch.process_nonce);
3464                Ok(())
3465            }
3466            async fn flush(&self) -> Result<(), AdapterError> {
3467                Ok(())
3468            }
3469            async fn shutdown(&self) -> Result<(), AdapterError> {
3470                Ok(())
3471            }
3472            async fn poll_shard(
3473                &self,
3474                _: u16,
3475                _: Option<&str>,
3476                _: usize,
3477            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3478                Ok(crate::adapter::ShardPollResult::empty())
3479            }
3480            fn name(&self) -> &'static str {
3481                "nonce-recording"
3482            }
3483        }
3484
3485        let nonce_first_run = Arc::new(parking_lot::Mutex::new(None));
3486        {
3487            let bus = EventBus::new_with_adapter(
3488                make_config(&nonce_path),
3489                Box::new(NonceRecordingAdapter {
3490                    nonce: nonce_first_run.clone(),
3491                }),
3492            )
3493            .await
3494            .unwrap();
3495            bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3496            bus.flush().await.unwrap();
3497            bus.shutdown().await.unwrap();
3498        }
3499
3500        let nonce_second_run = Arc::new(parking_lot::Mutex::new(None));
3501        {
3502            let bus = EventBus::new_with_adapter(
3503                make_config(&nonce_path),
3504                Box::new(NonceRecordingAdapter {
3505                    nonce: nonce_second_run.clone(),
3506                }),
3507            )
3508            .await
3509            .unwrap();
3510            bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3511            bus.flush().await.unwrap();
3512            bus.shutdown().await.unwrap();
3513        }
3514
3515        let n_a = nonce_first_run
3516            .lock()
3517            .expect("first bus must have dispatched a batch");
3518        let n_b = nonce_second_run
3519            .lock()
3520            .expect("second bus must have dispatched a batch");
3521        assert_eq!(
3522            n_a, n_b,
3523            "two bus instances against the same producer_nonce_path \
3524             must stamp the same nonce — pre-fix this regenerated on \
3525             every restart and JetStream's dedup window saw new \
3526             msg-ids as fresh batches",
3527        );
3528
3529        // Cleanup.
3530        let _ = std::fs::remove_file(&nonce_path);
3531    }
3532
3533    /// Pin that ALL spawn sites — both the static initial-shard
3534    /// loop in `new_with_adapter` and the dynamic-add path in
3535    /// `add_shard_internal` — clone the bus's loaded
3536    /// `producer_nonce` correctly. Pre-#56 there was no nonce
3537    /// concept at the bus layer; if any future refactor drops the
3538    /// `producer_nonce: self.producer_nonce` line from one of the
3539    /// spawn sites (or stops loading the persistent path), the
3540    /// post-scale-up shard's batches would carry a different nonce
3541    /// and JetStream's cross-restart dedup would silently break for
3542    /// events ingested into the dynamic shard. Pin all observed
3543    /// batches across the static + dynamic shards share the bus's
3544    /// nonce.
3545    #[tokio::test]
3546    async fn multi_shard_bus_stamps_consistent_nonce_across_static_and_dynamic_shards() {
3547        let mut nonce_path = std::env::temp_dir();
3548        let pid = std::process::id();
3549        let nanos = std::time::SystemTime::now()
3550            .duration_since(std::time::UNIX_EPOCH)
3551            .map(|d| d.as_nanos())
3552            .unwrap_or(0);
3553        nonce_path.push(format!("net-test-multi-shard-nonce-{pid}-{nanos}"));
3554
3555        struct CollectingAdapter {
3556            nonces: Arc<parking_lot::Mutex<Vec<u64>>>,
3557        }
3558        #[async_trait::async_trait]
3559        impl crate::adapter::Adapter for CollectingAdapter {
3560            async fn init(&mut self) -> Result<(), AdapterError> {
3561                Ok(())
3562            }
3563            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3564                self.nonces.lock().push(batch.process_nonce);
3565                Ok(())
3566            }
3567            async fn flush(&self) -> Result<(), AdapterError> {
3568                Ok(())
3569            }
3570            async fn shutdown(&self) -> Result<(), AdapterError> {
3571                Ok(())
3572            }
3573            async fn poll_shard(
3574                &self,
3575                _: u16,
3576                _: Option<&str>,
3577                _: usize,
3578            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3579                Ok(crate::adapter::ShardPollResult::empty())
3580            }
3581            fn name(&self) -> &'static str {
3582                "collecting"
3583            }
3584        }
3585
3586        let nonces = Arc::new(parking_lot::Mutex::new(Vec::new()));
3587        let policy = ScalingPolicy {
3588            min_shards: 1,
3589            max_shards: 8,
3590            cooldown: Duration::from_nanos(1),
3591            ..Default::default()
3592        };
3593        let config = EventBusConfig::builder()
3594            .num_shards(2)
3595            .ring_buffer_capacity(1024)
3596            .scaling(policy)
3597            .producer_nonce_path(&nonce_path)
3598            .build()
3599            .unwrap();
3600
3601        let bus = EventBus::new_with_adapter(
3602            config,
3603            Box::new(CollectingAdapter {
3604                nonces: nonces.clone(),
3605            }),
3606        )
3607        .await
3608        .unwrap();
3609
3610        // Drive the two static shards.
3611        for i in 0..200u64 {
3612            let _ = bus.ingest(Event::new(json!({"i": i})));
3613        }
3614        bus.flush().await.unwrap();
3615
3616        // Add a dynamic shard and drive it too.
3617        let _ = bus.manual_scale_up(1).await.unwrap();
3618        for i in 200..400u64 {
3619            let _ = bus.ingest(Event::new(json!({"i": i})));
3620        }
3621        bus.flush().await.unwrap();
3622
3623        bus.shutdown().await.unwrap();
3624
3625        let observed = nonces.lock().clone();
3626        assert!(
3627            !observed.is_empty(),
3628            "expected the adapter to have observed at least one batch",
3629        );
3630        let first = observed[0];
3631        for (i, &n) in observed.iter().enumerate() {
3632            assert_eq!(
3633                n, first,
3634                "batch {i} stamped a different nonce ({n:#x}) than the first \
3635                 batch ({first:#x}) — at least one spawn site (initial-shard \
3636                 loop or `add_shard_internal`) failed to inherit the bus's \
3637                 producer_nonce",
3638            );
3639        }
3640
3641        let _ = std::fs::remove_file(&nonce_path);
3642    }
3643
3644    /// Regression guard for the `(process_nonce, shard_id,
3645    /// sequence_start, i)` JetStream / Redis msg-id construction.
3646    /// Within one bus instance, `sequence_start` per shard MUST be
3647    /// strictly monotonic across batches AND every two adjacent
3648    /// batches `n` and `n+1` from the same shard MUST satisfy
3649    /// `seq_start[n+1] == seq_start[n] + len(events[n])` — no gaps,
3650    /// no overlap. A regression here breaks the at-most-once dedup
3651    /// every persistent adapter relies on: gaps reuse a `(shard,
3652    /// seq, i)` tuple after the dedup window closes, and overlap
3653    /// silently overlays a later batch on an earlier one's slot.
3654    ///
3655    /// The cross-restart variant (persistent `next_sequence` across
3656    /// process boots) is feature-shaped, not a bug fix — without
3657    /// persistence, restart relies on `process_nonce` rotating to
3658    /// disjoin the msg-id namespace (pinned by
3659    /// `persistent_producer_nonce_survives_bus_restart`). This test
3660    /// pins the within-process invariant the persistent nonce
3661    /// builds on.
3662    #[tokio::test]
3663    async fn sequence_start_is_per_shard_monotonic_and_gap_free() {
3664        struct ShardSeqRecorder {
3665            batches: Arc<parking_lot::Mutex<Vec<(u16, u64, usize)>>>,
3666        }
3667        #[async_trait::async_trait]
3668        impl crate::adapter::Adapter for ShardSeqRecorder {
3669            async fn init(&mut self) -> Result<(), AdapterError> {
3670                Ok(())
3671            }
3672            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3673                self.batches.lock().push((
3674                    batch.shard_id,
3675                    batch.sequence_start,
3676                    batch.events.len(),
3677                ));
3678                Ok(())
3679            }
3680            async fn flush(&self) -> Result<(), AdapterError> {
3681                Ok(())
3682            }
3683            async fn shutdown(&self) -> Result<(), AdapterError> {
3684                Ok(())
3685            }
3686            async fn poll_shard(
3687                &self,
3688                _: u16,
3689                _: Option<&str>,
3690                _: usize,
3691            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3692                Ok(crate::adapter::ShardPollResult::empty())
3693            }
3694            fn name(&self) -> &'static str {
3695                "shard-seq-recorder"
3696            }
3697        }
3698
3699        let batches = Arc::new(parking_lot::Mutex::new(Vec::new()));
3700        let config = EventBusConfig::builder()
3701            .num_shards(4)
3702            .ring_buffer_capacity(1024)
3703            .build()
3704            .unwrap();
3705        let bus = EventBus::new_with_adapter(
3706            config,
3707            Box::new(ShardSeqRecorder {
3708                batches: batches.clone(),
3709            }),
3710        )
3711        .await
3712        .unwrap();
3713
3714        // Three drive-then-flush rounds so each shard sees multiple
3715        // batches; the across-batch monotonicity invariant is what
3716        // we're pinning, so a single batch per shard wouldn't
3717        // exercise it.
3718        for round in 0..3u64 {
3719            for i in 0..200u64 {
3720                bus.ingest(Event::new(json!({"r": round, "i": i}))).unwrap();
3721            }
3722            bus.flush().await.unwrap();
3723        }
3724        bus.shutdown().await.unwrap();
3725
3726        let observed = batches.lock().clone();
3727        assert!(
3728            !observed.is_empty(),
3729            "expected the adapter to have observed at least one batch",
3730        );
3731
3732        // Bucket batches per shard, preserving dispatch order.
3733        let mut by_shard: std::collections::HashMap<u16, Vec<(u64, usize)>> =
3734            std::collections::HashMap::new();
3735        for (shard, seq_start, len) in observed {
3736            by_shard.entry(shard).or_default().push((seq_start, len));
3737        }
3738
3739        for (shard, runs) in &by_shard {
3740            assert!(
3741                !runs.is_empty(),
3742                "shard {shard}: must have at least one batch",
3743            );
3744            // First batch of every shard starts at the per-shard
3745            // zero — `BatchWorker::next_sequence` is initialized to
3746            // 0 and the first `flush()` reads it before the
3747            // `saturating_add`. A regression that lazily seeds
3748            // `next_sequence` from a non-zero source (e.g. wall
3749            // clock) would trip here.
3750            assert_eq!(
3751                runs[0].0, 0,
3752                "shard {shard}: first batch must start at sequence 0, got {}",
3753                runs[0].0,
3754            );
3755            for window in runs.windows(2) {
3756                let (prev_start, prev_len) = window[0];
3757                let (next_start, _next_len) = window[1];
3758                let expected_next = prev_start
3759                    .checked_add(prev_len as u64)
3760                    .expect("test bounds keep us well below u64::MAX");
3761                assert!(
3762                    next_start > prev_start,
3763                    "shard {shard}: sequence_start must be strictly monotonic; \
3764                     saw {prev_start} → {next_start}",
3765                );
3766                assert_eq!(
3767                    next_start, expected_next,
3768                    "shard {shard}: gap or overlap in sequence_start; \
3769                     prev=({prev_start}, len={prev_len}) → next={next_start}, \
3770                     expected {expected_next}. A gap reuses (shard, seq, i) \
3771                     tuples after the JetStream/Redis dedup window closes; an \
3772                     overlap silently overlays a later batch on an earlier \
3773                     one's slot.",
3774                );
3775            }
3776        }
3777    }
3778
3779    /// Pin the within-process caching contract for the fallback
3780    /// (no-`producer_nonce_path`) path: two bus instances created
3781    /// in the same process see the SAME `batch_process_nonce()`
3782    /// because the helper is `OnceLock`-cached. The
3783    /// "different-across-restarts" semantic is a *process-level*
3784    /// guarantee — restart the process to get a fresh nonce — and
3785    /// is pinned by `persistent_producer_nonce_survives_bus_restart`
3786    /// (which uses a path; the without-path branch of #56 has no
3787    /// cross-restart guarantee by design).
3788    ///
3789    /// Cubic-ai P3: this test was previously named
3790    /// `process_nonce_fallback_differs_across_bus_instances`, which
3791    /// contradicted its own assertion (`assert_eq!(n_a, n_b)`).
3792    /// Renamed to match what it actually pins.
3793    #[tokio::test]
3794    async fn process_nonce_fallback_is_cached_within_process() {
3795        struct NonceRecordingAdapter {
3796            nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3797        }
3798        #[async_trait::async_trait]
3799        impl crate::adapter::Adapter for NonceRecordingAdapter {
3800            async fn init(&mut self) -> Result<(), AdapterError> {
3801                Ok(())
3802            }
3803            async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3804                *self.nonce.lock() = Some(batch.process_nonce);
3805                Ok(())
3806            }
3807            async fn flush(&self) -> Result<(), AdapterError> {
3808                Ok(())
3809            }
3810            async fn shutdown(&self) -> Result<(), AdapterError> {
3811                Ok(())
3812            }
3813            async fn poll_shard(
3814                &self,
3815                _: u16,
3816                _: Option<&str>,
3817                _: usize,
3818            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3819                Ok(crate::adapter::ShardPollResult::empty())
3820            }
3821            fn name(&self) -> &'static str {
3822                "nonce-recording"
3823            }
3824        }
3825
3826        let cfg = || {
3827            EventBusConfig::builder()
3828                .num_shards(1)
3829                .ring_buffer_capacity(1024)
3830                .build()
3831                .unwrap()
3832        };
3833
3834        let n_a = Arc::new(parking_lot::Mutex::new(None));
3835        let n_b = Arc::new(parking_lot::Mutex::new(None));
3836        {
3837            let bus = EventBus::new_with_adapter(
3838                cfg(),
3839                Box::new(NonceRecordingAdapter { nonce: n_a.clone() }),
3840            )
3841            .await
3842            .unwrap();
3843            bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3844            bus.flush().await.unwrap();
3845            bus.shutdown().await.unwrap();
3846        }
3847        {
3848            let bus = EventBus::new_with_adapter(
3849                cfg(),
3850                Box::new(NonceRecordingAdapter { nonce: n_b.clone() }),
3851            )
3852            .await
3853            .unwrap();
3854            bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3855            bus.flush().await.unwrap();
3856            bus.shutdown().await.unwrap();
3857        }
3858
3859        // Note: in a single-process test BOTH bus instances see the
3860        // same `OnceLock`-cached `batch_process_nonce`, so the
3861        // nonces ARE equal here even though the documented
3862        // semantic is "fresh per process." This test pins the
3863        // cached-within-a-process invariant; the across-PROCESSES
3864        // semantic is exercised by the persistent-nonce test
3865        // above (which is the actually-load-bearing path for the
3866        // persistent-nonce fix).
3867        let n_a = n_a.lock().unwrap();
3868        let n_b = n_b.lock().unwrap();
3869        assert_eq!(
3870            n_a, n_b,
3871            "within one process, batch_process_nonce is OnceLock-cached \
3872             so two bus instances see the same nonce — the \
3873             different-across-restarts contract is process-level, \
3874             pinned via `persistent_producer_nonce_survives_bus_restart`",
3875        );
3876    }
3877
3878    /// Regression: `EventBusStats::batches_dispatched`
3879    /// (and the new `events_dispatched`) must actually increment on
3880    /// every successful adapter dispatch. Pre-fix `batches_dispatched`
3881    /// was declared but never updated, so flush()'s Phase 2 progress
3882    /// gate was constant-zero and early-broke after one window —
3883    /// flake on Windows-class timer resolution. Pin both counters
3884    /// directly here so a future refactor that drops the increment
3885    /// fails this test, not the timing-dependent
3886    /// `flush_is_a_delivery_barrier`.
3887    #[tokio::test]
3888    async fn dispatch_increments_bus_level_event_and_batch_counters() {
3889        let received = Arc::new(AtomicU64::new(0));
3890        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3891            received: received.clone(),
3892        });
3893
3894        let config = EventBusConfig::builder()
3895            .num_shards(2)
3896            .ring_buffer_capacity(1024)
3897            .batch(crate::config::BatchConfig {
3898                min_size: 1,
3899                max_size: 10,
3900                max_delay: Duration::from_millis(10),
3901                adaptive: false,
3902                velocity_window: Duration::from_millis(100),
3903            })
3904            .build()
3905            .unwrap();
3906        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3907
3908        for i in 0..50 {
3909            bus.ingest(Event::new(json!({"i": i}))).unwrap();
3910        }
3911        bus.flush().await.unwrap();
3912
3913        let batches = bus.stats().batches_dispatched.load(AtomicOrdering::Acquire);
3914        let events = bus.stats().events_dispatched.load(AtomicOrdering::Acquire);
3915        assert!(
3916            batches > 0,
3917            "batches_dispatched must be > 0 after flush — pre-fix it was \
3918             never incremented anywhere, breaking flush()'s Phase 2 progress gate",
3919        );
3920        assert_eq!(
3921            events, 50,
3922            "events_dispatched must equal the number of events handed to \
3923             the adapter (got {events}, expected 50)",
3924        );
3925
3926        bus.shutdown().await.unwrap();
3927    }
3928
3929    /// Regression: BUG_REPORT.md #6 — drop-without-shutdown must
3930    /// still release the drain-finalize gate so detached drain
3931    /// workers can exit instead of parking on the gate until the
3932    /// internal `DRAIN_FINALIZE_TIMEOUT` deadline. Pinning this
3933    /// keeps the `Drop` impl honest if someone refactors the
3934    /// shutdown gates later.
3935    #[tokio::test]
3936    async fn drop_releases_drain_finalize_gate_promptly() {
3937        let config = EventBusConfig::builder()
3938            .num_shards(2)
3939            .ring_buffer_capacity(1024)
3940            .build()
3941            .unwrap();
3942        let bus = EventBus::new(config).await.unwrap();
3943        let drain_gate = bus.drain_finalize_ready.clone();
3944
3945        // Drop without an awaited shutdown.
3946        drop(bus);
3947
3948        // The Drop impl must have set the gate. `DRAIN_FINALIZE_TIMEOUT`
3949        // is 10s; if Drop didn't flip the gate, drain workers would
3950        // park for up to that long before exiting.
3951        assert!(
3952            drain_gate.load(AtomicOrdering::Acquire),
3953            "Drop must release `drain_finalize_ready` so detached drain \
3954             workers exit promptly"
3955        );
3956    }
3957
3958    /// The lossy-shutdown reconciliation at L1615-L1635 has TWO
3959    /// arms after the deadline-snapshot fires:
3960    ///
3961    ///   - `actual_drops > 0` → bump `events_dropped`
3962    ///     (already exercised by the deadline path running with
3963    ///     real producers that never recover)
3964    ///   - `actual_drops == 0` → clear the eager
3965    ///     `shutdown_was_lossy = true` because the drain caught
3966    ///     up to every stranded ingest
3967    ///
3968    /// The second arm is the false-positive-alert fix: pre-fix,
3969    /// the `was_lossy` flag stayed `true` against
3970    /// `events_dropped == 0` for any deadline-triggered shutdown
3971    /// whose final drain happened to catch up — operator
3972    /// dashboards alerting on `was_lossy && dropped == 0` saw
3973    /// false positives.
3974    ///
3975    /// We pin the clear-flag arm by:
3976    ///   1. Manually staging `in_flight_ingests = 1` so the
3977    ///      shutdown spin loop sees pending work.
3978    ///   2. Using `tokio::time::advance(6s)` to force the
3979    ///      hardcoded 5-second deadline (the comments at L1399
3980    ///      and L1725 confirm the deadline uses
3981    ///      `tokio::time::Instant` precisely so paused-time
3982    ///      tests can virtualize it).
3983    ///   3. Simulating the stranded producer completing AFTER
3984    ///      the deadline snapshot: bump `events_ingested += 1`
3985    ///      and drop the in-flight counter back to 0. The
3986    ///      post-drain reconciliation reads
3987    ///      `post_deadline_ingests = 1 - 0 = 1 >= stranded = 1`
3988    ///      so `actual_drops == 0` and the arm clears
3989    ///      `shutdown_was_lossy`.
3990    #[tokio::test(start_paused = true)]
3991    async fn lossy_shutdown_reconciliation_clears_was_lossy_when_drain_catches_up() {
3992        let config = EventBusConfig::builder()
3993            .num_shards(1)
3994            .ring_buffer_capacity(1024)
3995            .without_scaling()
3996            .build()
3997            .unwrap();
3998        let bus = Arc::new(
3999            EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4000                .await
4001                .unwrap(),
4002        );
4003
4004        // Stage one "stranded" in-flight ingest. Real producers
4005        // would have incremented this via try_enter_ingest and
4006        // be mid-call to `shard_manager.ingest`; the test helper
4007        // (defined alongside `EventBus`) holds the counter at 1
4008        // for the duration of the deadline-window.
4009        bus.stage_stranded_ingest(1);
4010
4011        // Spawn shutdown — it'll enter the in-flight spin loop
4012        // and park on `tokio::time::sleep(1ms)` until time
4013        // advances.
4014        let bus_for_shutdown = Arc::clone(&bus);
4015        let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4016
4017        // Yield enough times for the shutdown task to enter the
4018        // spin loop. With paused time, the spin's 1ms sleep
4019        // blocks until we advance.
4020        for _ in 0..10 {
4021            tokio::task::yield_now().await;
4022        }
4023
4024        // Advance past the 5s deadline. The next loop iteration
4025        // observes the deadline, captures `(stranded=1,
4026        // ingested=0, dispatched=0)`, sets `was_lossy = true`,
4027        // and breaks.
4028        tokio::time::advance(std::time::Duration::from_secs(6)).await;
4029        for _ in 0..10 {
4030            tokio::task::yield_now().await;
4031        }
4032
4033        // The eager-set was_lossy flag is visible now.
4034        assert!(
4035            bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4036            "deadline must have set was_lossy=true (eagerly)",
4037        );
4038
4039        // Now simulate the "stranded" producer completing AFTER
4040        // the deadline. The helper bumps events_ingested with
4041        // Release ordering before releasing the in-flight slot
4042        // — matching what a real `ingest` call site does, so
4043        // the reconciliation reads the same arithmetic.
4044        bus.complete_stranded_ingest(1);
4045
4046        // Let shutdown finish — its post-drain reconciliation
4047        // now reads ingested_after=1, post_deadline_ingests=1,
4048        // actual_drops=stranded(1)-1=0 → clears was_lossy.
4049        let _ = shutdown_task.await.unwrap();
4050
4051        assert!(
4052            !bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4053            "reconciliation must clear was_lossy when drain catches up to every stranded ingest \
4054             (post_deadline_ingests >= stranded → actual_drops == 0)",
4055        );
4056        // And events_dropped must NOT have been bumped — the
4057        // whole point of the reconciliation arm is that no event
4058        // was actually lost.
4059        assert_eq!(
4060            bus.stats.events_dropped.load(AtomicOrdering::Acquire),
4061            0,
4062            "no events were actually dropped — reconciliation must not bump events_dropped",
4063        );
4064    }
4065
4066    /// Companion to `lossy_shutdown_reconciliation_clears_was_lossy_*`:
4067    /// pins the OTHER arm of the same if/else at L1615-L1619.
4068    /// When drain does NOT catch up (post_deadline_ingests <
4069    /// stranded), `actual_drops > 0` and `events_dropped` is
4070    /// bumped. The `was_lossy = true` flag set eagerly before the
4071    /// reconciliation stays true here — pre-fix the same code
4072    /// path bumped events_dropped += stranded EAGERLY *and*
4073    /// counted the same events as ingested, breaking the
4074    /// `ingested == dispatched + dropped` invariant.
4075    #[tokio::test(start_paused = true)]
4076    async fn lossy_shutdown_reconciliation_bumps_events_dropped_when_drain_does_not_catch_up() {
4077        let config = EventBusConfig::builder()
4078            .num_shards(1)
4079            .ring_buffer_capacity(1024)
4080            .without_scaling()
4081            .build()
4082            .unwrap();
4083        let bus = Arc::new(
4084            EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4085                .await
4086                .unwrap(),
4087        );
4088
4089        // Stage two stranded ingests so the actual_drops count
4090        // is meaningful (not just 0 vs 1).
4091        bus.stage_stranded_ingest(2);
4092
4093        let bus_for_shutdown = Arc::clone(&bus);
4094        let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4095
4096        for _ in 0..10 {
4097            tokio::task::yield_now().await;
4098        }
4099        tokio::time::advance(std::time::Duration::from_secs(6)).await;
4100        for _ in 0..10 {
4101            tokio::task::yield_now().await;
4102        }
4103
4104        // Drop the in-flight counter to 0 so subsequent shutdown
4105        // bookkeeping doesn't see phantom in-flight work, but do
4106        // NOT bump events_ingested — the producers never
4107        // "completed," so the reconciliation must classify them
4108        // as drops.
4109        bus.release_stranded_ingest(2);
4110
4111        let _ = shutdown_task.await.unwrap();
4112
4113        assert!(
4114            bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4115            "was_lossy must stay true when drain did not catch up",
4116        );
4117        // stranded=2, post_deadline_ingests=0, actual_drops=2.
4118        // We don't pin == 2 exactly because the "known under-count
4119        // window" doc'd at L1595-L1614 acknowledges a ±1 bias
4120        // under racy interleavings; the floor is what matters.
4121        assert!(
4122            bus.stats.events_dropped.load(AtomicOrdering::Acquire) >= 1,
4123            "events_dropped must reflect at least one stranded ingest",
4124        );
4125    }
4126
4127    /// `remove_shard_internal` at L832-L859 dispatches any
4128    /// stranded ring-buffer events through the adapter as a
4129    /// one-shot batch. The normal scale-down path drains the
4130    /// ring via the drain worker before remove fires, so
4131    /// `stranded.is_empty()` on the happy path and L832-L859
4132    /// never runs — but a race window between drain completion
4133    /// and remove (or a manual remove on a shard whose drain
4134    /// worker is parked) leaves events stranded. Pre-fix any
4135    /// stranded events would have been silently dropped; the
4136    /// L832-L859 path now flushes them and bumps both
4137    /// `batches_dispatched` (+1) and `events_dispatched` (+N).
4138    ///
4139    /// We force the stranded-events condition by:
4140    ///   1. Pausing time so the drain worker can't pull from
4141    ///      the ring (its polling sleeps don't tick).
4142    ///   2. Pushing raw events directly into shard 0's ring via
4143    ///      the `with_shard` API.
4144    ///   3. Calling `remove_shard_internal(0)` — its inner
4145    ///      `shard_manager.remove_shard` then returns the
4146    ///      stranded vec, triggering the L832-L859 dispatch.
4147    #[tokio::test(start_paused = true)]
4148    async fn remove_shard_internal_dispatches_stranded_ring_buffer_events() {
4149        let received = Arc::new(AtomicU64::new(0));
4150        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
4151            received: Arc::clone(&received),
4152        });
4153        // Keep the default scaling policy: `remove_shard_internal`
4154        // routes through `shard_manager.remove_shard`, which
4155        // requires the dynamic-scaling mapper. `without_scaling()`
4156        // turns the mapper off and the remove fails with
4157        // "Dynamic scaling not enabled". The scaling MONITOR
4158        // task is a separate concern and isn't auto-started by
4159        // `new_with_adapter` — see
4160        // `start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it`
4161        // for the lifecycle contract.
4162        let config = EventBusConfig::builder()
4163            .num_shards(2)
4164            .ring_buffer_capacity(1024)
4165            .build()
4166            .unwrap();
4167        let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
4168
4169        // Yield enough times for the spawned drain workers to
4170        // reach their first `tokio::time::sleep` and park. With
4171        // `start_paused` they can't tick once parked; the gate
4172        // is here so the worker on shard 0 has *already parked*
4173        // before we push events into the ring. Without this, a
4174        // worker that hadn't yet hit its first park could race
4175        // the push and dispatch via the normal drain path
4176        // instead of the L832-L859 stranded path the test exists
4177        // to pin.
4178        for _ in 0..10 {
4179            tokio::task::yield_now().await;
4180        }
4181
4182        // Stage 5 events directly in shard 0's ring. With paused
4183        // time the drain worker can't pull them out — they sit
4184        // there until remove fires.
4185        let stranded_count = 5u64;
4186        let pushed = bus.shard_manager.with_shard(0, |shard| {
4187            for i in 0..stranded_count {
4188                shard
4189                    .try_push_raw(bytes::Bytes::from(format!("stranded-{i}")))
4190                    .unwrap();
4191            }
4192        });
4193        assert!(
4194            pushed.is_some(),
4195            "shard 0 must exist for try_push_raw to land",
4196        );
4197
4198        let batches_before = bus.stats.batches_dispatched.load(AtomicOrdering::Acquire);
4199        let events_before = bus.stats.events_dispatched.load(AtomicOrdering::Acquire);
4200
4201        // Directly invoke the internal — the production path
4202        // routes through manual_scale_down + finalize_draining,
4203        // but the L832-L859 dispatch arm is the same regardless
4204        // of how we got here.
4205        bus.remove_shard_internal(0)
4206            .await
4207            .expect("remove_shard_internal must succeed");
4208
4209        // The CountingAdapter saw the stranded batch.
4210        assert_eq!(
4211            received.load(AtomicOrdering::SeqCst),
4212            stranded_count,
4213            "stranded events must reach the adapter",
4214        );
4215        // Bus stats bumped by exactly the right amounts.
4216        assert_eq!(
4217            bus.stats.batches_dispatched.load(AtomicOrdering::Acquire),
4218            batches_before + 1,
4219            "exactly one stranded batch must increment batches_dispatched",
4220        );
4221        assert_eq!(
4222            bus.stats.events_dispatched.load(AtomicOrdering::Acquire),
4223            events_before + stranded_count,
4224            "events_dispatched must reflect every stranded event",
4225        );
4226
4227        // Clean shutdown so the remaining shard's drain worker
4228        // exits cleanly rather than getting torn down by the
4229        // bus's `Drop` impl (which silently loses any events
4230        // still in the ring buffers / mpsc channels). Mirrors
4231        // the lifecycle hygiene the other paused-time tests in
4232        // this module follow.
4233        //
4234        // The remaining drain worker (shard 1) is parked on
4235        // `tokio::time::sleep`; tokio's paused-time auto-advance
4236        // wakes it once every task is parked on time, but we
4237        // yield-loop here anyway so the JoinHandle await below
4238        // doesn't depend on auto-advance heuristics.
4239        bus.shutdown_via_ref()
4240            .await
4241            .expect("shutdown must succeed under paused time");
4242    }
4243
4244    /// `EventBus::new_with_adapter` must surface a `Fatal` error
4245    /// — naming the configured path — when `producer_nonce_path`
4246    /// points at a location the runtime can't read or create. The
4247    /// path-load failure path at L292-L301 is the single signal
4248    /// operators get that their persistent-nonce wiring is wrong;
4249    /// silently falling back to the per-process nonce would
4250    /// silently degrade the durability guarantee that path was
4251    /// chosen to provide (at-most-once across restart).
4252    #[tokio::test]
4253    async fn new_with_adapter_surfaces_fatal_when_nonce_path_unreadable() {
4254        // Point at a path whose parent directory doesn't exist.
4255        // `load_or_create` has to mkdir-then-write, so a missing
4256        // parent fails on every platform. Anchoring under
4257        // `std::env::temp_dir()` keeps the path well-formed on
4258        // both Windows (`C:\Users\...\Temp\...`) and Linux/macOS
4259        // (`/tmp/...`) — a hardcoded `C:\\…` literal looks like a
4260        // valid filename on POSIX filesystems and the create
4261        // happily succeeds, silently bypassing the failure-arm
4262        // this test exists to pin.
4263        let marker = "net-coverage-nonexistent-parent";
4264        let bogus_path = std::env::temp_dir()
4265            .join(marker)
4266            .join("deeply")
4267            .join("nested")
4268            .join("nonce");
4269        let config = EventBusConfig::builder()
4270            .num_shards(1)
4271            .ring_buffer_capacity(1024)
4272            .producer_nonce_path(&bogus_path)
4273            .build()
4274            .unwrap();
4275        let adapter: Box<dyn crate::adapter::Adapter> =
4276            Box::new(crate::adapter::NoopAdapter::new());
4277
4278        // EventBus doesn't impl Debug, so we can't unwrap_err
4279        // it directly — pattern-match instead.
4280        let result = EventBus::new_with_adapter(config, adapter).await;
4281        match result {
4282            Err(AdapterError::Fatal(msg)) => {
4283                assert!(
4284                    msg.contains("producer-nonce") || msg.contains("nonce"),
4285                    "error must name the failing subsystem: {msg}",
4286                );
4287                // The path must appear in the message so operators
4288                // can grep their logs to the misconfigured config
4289                // field without guessing. The unique marker
4290                // segment is portable across platforms (the temp
4291                // root differs but the joined marker is the same).
4292                assert!(
4293                    msg.contains(marker),
4294                    "error must include the configured path: {msg}",
4295                );
4296            }
4297            Err(other) => panic!("expected Fatal, got {other:?}"),
4298            Ok(_) => panic!("expected Fatal, got Ok"),
4299        }
4300    }
4301
4302    /// `start_scaling_monitor` must be a no-op on a non-scaling
4303    /// bus AND idempotent under repeated calls on a scaling bus.
4304    /// A regression that drops the early-returns would spawn
4305    /// shadow monitor tasks that hold a Weak<EventBus> until they
4306    /// next observe shutdown — pure-overhead duplicate
4307    /// metrics-tick wakeup rates that compete for the same
4308    /// `evaluate_scaling` lock.
4309    #[tokio::test]
4310    async fn start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it() {
4311        // Scaling explicitly disabled: start_scaling_monitor
4312        // must return before touching the slot. (The builder
4313        // default fills in a ScalingPolicy when scaling isn't
4314        // mentioned — `without_scaling()` is the only way to
4315        // force `config.scaling == None`.)
4316        let config = EventBusConfig::builder()
4317            .num_shards(1)
4318            .ring_buffer_capacity(1024)
4319            .without_scaling()
4320            .build()
4321            .unwrap();
4322        let bus = EventBus::new(config).await.unwrap();
4323        let bus = Arc::new(bus);
4324        bus.start_scaling_monitor();
4325        assert!(
4326            bus.scaling_monitor.lock().is_none(),
4327            "non-scaling bus must not install a monitor",
4328        );
4329
4330        // Scaling enabled: first call installs, second is a no-op
4331        // (the existing handle stays put — proves the early-return
4332        // didn't overwrite the slot).
4333        let policy = crate::shard::ScalingPolicy {
4334            min_shards: 2,
4335            max_shards: 4,
4336            ..Default::default()
4337        };
4338        let config = EventBusConfig::builder()
4339            .num_shards(2)
4340            .ring_buffer_capacity(1024)
4341            .scaling(policy)
4342            .build()
4343            .unwrap();
4344        let bus = Arc::new(EventBus::new(config).await.unwrap());
4345        // EventBus::new doesn't auto-start the monitor — we start
4346        // it explicitly so the test owns the lifecycle.
4347        bus.start_scaling_monitor();
4348        let handle_id_after_first = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4349        assert!(handle_id_after_first.is_some(), "first start must install");
4350
4351        // Second call must NOT replace the handle. If the early-
4352        // return is broken, the slot would carry a new handle id.
4353        bus.start_scaling_monitor();
4354        let handle_id_after_second = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4355        assert_eq!(
4356            handle_id_after_first, handle_id_after_second,
4357            "second start_scaling_monitor must NOT replace the running handle",
4358        );
4359    }
4360}