Skip to main content

net/
bus.rs

1//! Main EventBus facade.
2//!
3//! The EventBus provides a unified API for:
4//! - Event ingestion (non-blocking)
5//! - Event consumption (async polling with filtering)
6//! - Lifecycle management
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14
15use crate::adapter::{Adapter, NoopAdapter};
16use crate::config::{AdapterConfig, BatchConfig, EventBusConfig};
17use crate::consumer::{ConsumeRequest, ConsumeResponse, PollMerger};
18use crate::error::{AdapterError, ConsumerError, IngestionError, IngestionResult};
19use crate::event::{Batch, Event, RawEvent};
20use crate::shard::{BatchWorker, ScalingDecision, ShardManager, ShardMetrics};
21
22#[cfg(feature = "jetstream")]
23use crate::adapter::JetStreamAdapter;
24#[cfg(feature = "net")]
25use crate::adapter::NetAdapter;
26#[cfg(feature = "redis")]
27use crate::adapter::RedisAdapter;
28
29/// The main event bus.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// use net::{EventBus, EventBusConfig, Event};
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
38///     let bus = EventBus::new(EventBusConfig::default()).await?;
39///
40///     // Ingest events
41///     bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
42///
43///     // Poll events
44///     let response = bus.poll(ConsumeRequest::new(100)).await?;
45///
46///     bus.shutdown().await?;
47///     Ok(())
48/// }
49/// ```
50pub struct EventBus {
51    /// Shard manager for parallel ingestion.
52    shard_manager: Arc<ShardManager>,
53    /// Adapter for durable storage.
54    adapter: Arc<dyn Adapter>,
55    /// Poll merger for cross-shard consumption.
56    poll_merger: arc_swap::ArcSwap<PollMerger>,
57    /// Serializes the `shard_manager.shard_ids() → poll_merger.store`
58    /// block in `add_shard_internal` / `remove_shard_internal`.
59    /// Without this lock, two callers (e.g. scaling-monitor
60    /// add_shard racing manual_scale_down's remove_shard) read the
61    /// shard ids snapshot at slightly different points and then
62    /// race on the `arc_swap.store`. The write that lands second
63    /// can clobber the more-current view: T1 reads `{0..5}`, T2
64    /// reads `{1..4}`, T2 stores `{1..4}`, T1 stores `{0..5}` —
65    /// the published merger then routes polls to the just-removed
66    /// shard 0 until the next topology change repairs the
67    /// snapshot.
68    poll_merger_swap_lock: parking_lot::Mutex<()>,
69    /// Per-shard worker handles. Stored separately so shutdown can
70    /// await drain workers *before* batch workers — the drain
71    /// worker's final sweep races the batch worker's exit
72    /// otherwise, and any events the drain worker pushes to the
73    /// channel after the batch worker has stopped reading are
74    /// silently lost.
75    batch_workers: parking_lot::Mutex<std::collections::HashMap<u16, ShardWorkers>>,
76    /// Channels for sending batches to workers (shard_id -> sender).
77    batch_senders: parking_lot::RwLock<
78        std::collections::HashMap<u16, mpsc::Sender<Vec<crate::event::InternalEvent>>>,
79    >,
80    /// Shutdown flag.
81    shutdown: Arc<AtomicBool>,
82    /// Gate signaling drain workers that the in-flight wait has
83    /// completed and they may safely run their final ring-buffer
84    /// sweep. Distinct from `shutdown` because the drain worker
85    /// observing `shutdown=true` alone is not enough: a producer
86    /// that read `shutdown=false` may still be mid-push, and if the
87    /// drain worker rushes through its final sweep before that push
88    /// is visible the event is stranded. `shutdown()` sets this
89    /// after waiting for `in_flight_ingests==0`, at which point the
90    /// Acquire load on the drain side synchronizes-with the Release
91    /// store here, transitively chaining through the SeqCst
92    /// in-flight handshake to make every observed-pre-shutdown push
93    /// visible to the drain worker's subsequent `pop_batch_into`.
94    drain_finalize_ready: Arc<AtomicBool>,
95    /// In-flight ingest counter. Incremented before each ingest's
96    /// shutdown check and decremented after the push completes (or
97    /// bails). `shutdown()` waits for this to drop to zero *after*
98    /// setting `shutdown=true` and *before* setting
99    /// `drain_finalize_ready=true` so no producer is mid-push when
100    /// the drain workers do their final sweep — closing the race
101    /// where a producer that observed `shutdown=false` could push
102    /// *after* the drain worker's last `pop_batch_into` returned
103    /// zero, leaving the event stranded in the ring buffer.
104    /// Pre-fix this was `AtomicU32`. A 4-billion-in-flight
105    /// wrap is not realistic in production, but the counter
106    /// participates in the shutdown protocol — a wrap to 0 would
107    /// trick the wait-for-zero loop into thinking shutdown was
108    /// safe to proceed while producers were still mid-push.
109    /// Widened to `AtomicU64` so the wrap is astronomical
110    /// (1.8e19 in-flight ingests).
111    in_flight_ingests: AtomicU64,
112    /// Set to `true` after `shutdown()` runs to completion. `Drop`
113    /// uses this to detect "dropped without an awaited shutdown" —
114    /// in that case events still in the ring buffers / mpsc channels
115    /// are silently lost (see `Drop` impl).
116    shutdown_completed: AtomicBool,
117    /// Configuration.
118    config: EventBusConfig,
119    /// Statistics.
120    stats: Arc<EventBusStats>,
121    /// Producer nonce. Loaded from
122    /// `config.producer_nonce_path` on startup when the path is
123    /// configured; otherwise falls back to the per-process default
124    /// from `event::batch_process_nonce`. Stamped on every batch
125    /// the bus emits — the worker spawn copies this u64 into
126    /// `BatchWorkerParams::producer_nonce`, and
127    /// `remove_shard_internal`'s stranded-flush uses it via
128    /// `Batch::with_nonce`.
129    producer_nonce: u64,
130    /// Scaling monitor task handle.
131    scaling_monitor: parking_lot::Mutex<Option<JoinHandle<()>>>,
132}
133
134/// Worker handles for a single shard. The drain worker pumps
135/// events from the ring buffer into an mpsc channel; the batch
136/// worker reads from that channel and dispatches to the adapter.
137/// Shutdown ordering is load-bearing — see `EventBus::shutdown`.
138struct ShardWorkers {
139    batch: JoinHandle<()>,
140    drain: JoinHandle<()>,
141    /// Bus-owned mirror of the BatchWorker's `next_sequence`.
142    /// `remove_shard_internal` reads this AFTER awaiting `batch`
143    /// to learn the worker's final post-flush sequence, then uses
144    /// it as the `sequence_start` for the stranded-ring-buffer
145    /// flush so the stranded msg-ids fall strictly past every
146    /// msg-id the worker emitted — without this, JetStream dedup
147    /// would silently drop the stranded batch when both used 0.
148    next_sequence: Arc<AtomicU64>,
149}
150
151/// RAII guard for an in-flight ingest. Decrements
152/// `in_flight_ingests` on drop so `shutdown()` can wait for the
153/// counter to reach zero.
154struct IngestGuard<'a> {
155    bus: &'a EventBus,
156}
157
158impl Drop for IngestGuard<'_> {
159    fn drop(&mut self) {
160        self.bus
161            .in_flight_ingests
162            .fetch_sub(1, AtomicOrdering::SeqCst);
163    }
164}
165
166/// Event bus statistics.
167#[derive(Debug, Default)]
168pub struct EventBusStats {
169    /// Total events ingested.
170    pub events_ingested: AtomicU64,
171    /// Events dropped due to backpressure.
172    pub events_dropped: AtomicU64,
173    /// Batches dispatched to adapter.
174    ///
175    /// Pre-fix this field was declared but never incremented anywhere
176    /// — `flush()`'s Phase 2 progress probe (`bus.rs:815`) read it as
177    /// "did the BatchWorker make progress this `max_delay` window?",
178    /// observed `0 == 0`, and always early-broke after a single
179    /// window. On Windows-class timer resolution the race against the
180    /// BatchWorker's first `recv_timeout` tipped the wrong way for
181    /// `flush_is_a_delivery_barrier` regularly. Now incremented in
182    /// the BatchWorker spawn (after a successful `dispatch_batch`)
183    /// and in `remove_shard_internal`'s stranded-flush.
184    pub batches_dispatched: AtomicU64,
185    /// Total events dispatched to the adapter (sum of batch lengths
186    /// from successful `on_batch` calls). Companion to
187    /// `batches_dispatched` — by the time `flush()` returns,
188    /// `events_dispatched + events_dropped == events_ingested`. FFI
189    /// consumers can also use this to monitor end-to-end delivery.
190    pub events_dispatched: AtomicU64,
191    /// Set to `true` if `shutdown()` / `shutdown_via_ref()` had to
192    /// proceed past the in-flight-ingest grace deadline (5 s) with
193    /// producers still mid-push. The stranded count is counted into
194    /// `events_dropped` (already; pre-fix), but `shutdown` returns
195    /// `Ok(())` regardless — callers that need to distinguish
196    /// "clean shutdown" from "lossy shutdown" check this flag in
197    /// `bus.stats()` afterward (only meaningful for the
198    /// `shutdown_via_ref` path that doesn't consume the bus).
199    ///
200    /// Pre-fix the warning + `events_dropped` increment
201    /// were the only signal; `Result<(), AdapterError>` returned
202    /// `Ok` indistinguishable from a clean shutdown.
203    pub shutdown_was_lossy: std::sync::atomic::AtomicBool,
204}
205
206impl EventBus {
207    /// Create a new event bus with the given configuration.
208    pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError> {
209        // Create adapter from config
210        let adapter: Box<dyn Adapter> = match &config.adapter {
211            AdapterConfig::Noop => Box::new(NoopAdapter::new()),
212            #[cfg(feature = "redis")]
213            AdapterConfig::Redis(redis_config) => {
214                Box::new(RedisAdapter::new(redis_config.clone())?)
215            }
216            #[cfg(feature = "jetstream")]
217            AdapterConfig::JetStream(js_config) => {
218                Box::new(JetStreamAdapter::new(js_config.clone())?)
219            }
220            #[cfg(feature = "net")]
221            AdapterConfig::Net(net_config) => Box::new(NetAdapter::new((**net_config).clone())?),
222        };
223
224        Self::new_with_adapter(config, adapter).await
225    }
226
227    /// Create a new event bus with a caller-supplied adapter.
228    ///
229    /// `config.adapter` is ignored — the supplied `adapter` is used
230    /// instead. Useful for tests that need to observe or inject
231    /// behavior at the adapter boundary (e.g. a counting adapter
232    /// for end-to-end delivery assertions, a flaky adapter for
233    /// retry-path coverage).
234    pub async fn new_with_adapter(
235        config: EventBusConfig,
236        mut adapter: Box<dyn Adapter>,
237    ) -> Result<Self, AdapterError> {
238        config
239            .validate()
240            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
241
242        // Initialize adapter (with timeout to prevent hanging on unreachable backends)
243        tokio::time::timeout(config.adapter_timeout, adapter.init())
244            .await
245            .map_err(|_| AdapterError::Fatal("adapter init timed out".into()))??;
246        let adapter: Arc<dyn Adapter> = Arc::from(adapter);
247
248        // Create shard manager (with or without dynamic scaling)
249        let shard_manager = if let Some(ref scaling_policy) = config.scaling {
250            Arc::new(
251                ShardManager::with_mapper(
252                    config.num_shards,
253                    config.ring_buffer_capacity,
254                    config.backpressure_mode,
255                    scaling_policy.clone(),
256                )
257                .map_err(|e| AdapterError::Fatal(e.to_string()))?,
258            )
259        } else {
260            Arc::new(ShardManager::new(
261                config.num_shards,
262                config.ring_buffer_capacity,
263                config.backpressure_mode,
264            ))
265        };
266
267        // Create poll merger.
268        //
269        // Pass the live id set rather than the count. At
270        // initial construction the ids are dense (`0..num_shards`),
271        // but using `shard_ids()` here keeps a single code path with
272        // the post-scaling re-stores below.
273        let poll_merger = arc_swap::ArcSwap::from_pointee(PollMerger::new(
274            adapter.clone(),
275            shard_manager.shard_ids(),
276        ));
277
278        // Shutdown flag and drain-finalize gate. See `drain_finalize_ready`
279        // doc on `EventBus` for the synchronization contract.
280        let shutdown = Arc::new(AtomicBool::new(false));
281        let drain_finalize_ready = Arc::new(AtomicBool::new(false));
282
283        // Stats are shared with every BatchWorker spawn so successful
284        // dispatches increment `batches_dispatched` / `events_dispatched`
285        // — `flush()`'s Phase 2 progress probe gates on those.
286        let stats = Arc::new(EventBusStats::default());
287
288        // Producer nonce. Persistent path → load-or-create
289        // the durable u64 so cross-process retries dedup against the
290        // prior incarnation. No path → per-process default (today's
291        // at-most-once-across-restart behavior).
292        let producer_nonce = match &config.producer_nonce_path {
293            Some(path) => crate::adapter::PersistentProducerNonce::load_or_create(path)
294                .map_err(|e| {
295                    AdapterError::Fatal(format!(
296                        "failed to load/create producer-nonce file at {}: {e}",
297                        path.display(),
298                    ))
299                })?
300                .nonce(),
301            None => crate::event::batch_process_nonce(),
302        };
303
304        // Create batch workers for each shard
305        let mut batch_workers: std::collections::HashMap<u16, ShardWorkers> =
306            std::collections::HashMap::with_capacity(config.num_shards as usize);
307        let mut batch_senders =
308            std::collections::HashMap::with_capacity(config.num_shards as usize);
309
310        for shard_id in 0..config.num_shards {
311            let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
312
313            let next_sequence = Arc::new(AtomicU64::new(0));
314
315            let batch = spawn_batch_worker(BatchWorkerParams {
316                shard_id,
317                rx,
318                adapter: adapter.clone(),
319                shard_manager: shard_manager.clone(),
320                config: config.batch.clone(),
321                adapter_timeout: config.adapter_timeout,
322                batch_retries: config.adapter_batch_retries,
323                next_sequence: next_sequence.clone(),
324                stats: stats.clone(),
325                producer_nonce,
326            });
327
328            let drain = spawn_drain_worker_for_shard(
329                shard_id,
330                shard_manager.clone(),
331                tx.clone(),
332                shutdown.clone(),
333                drain_finalize_ready.clone(),
334            );
335
336            batch_workers.insert(
337                shard_id,
338                ShardWorkers {
339                    batch,
340                    drain,
341                    next_sequence,
342                },
343            );
344            batch_senders.insert(shard_id, tx);
345        }
346
347        let bus = Self {
348            shard_manager,
349            adapter,
350            poll_merger,
351            poll_merger_swap_lock: parking_lot::Mutex::new(()),
352            batch_workers: parking_lot::Mutex::new(batch_workers),
353            batch_senders: parking_lot::RwLock::new(batch_senders),
354            shutdown,
355            drain_finalize_ready,
356            in_flight_ingests: AtomicU64::new(0),
357            shutdown_completed: AtomicBool::new(false),
358            config,
359            stats,
360            producer_nonce,
361            scaling_monitor: parking_lot::Mutex::new(None),
362        };
363
364        Ok(bus)
365    }
366
367    /// Start the scaling monitor (if dynamic scaling is enabled).
368    /// This spawns a background task that periodically evaluates scaling decisions.
369    ///
370    /// The spawned task holds a `Weak<Self>` rather than a strong
371    /// `Arc<Self>` clone. With a strong clone the task kept the bus
372    /// alive forever, and `shutdown(self)` (which consumes by value)
373    /// was unreachable: callers with an `Arc<EventBus>` could not
374    /// `Arc::try_unwrap` to consume it because the spawned task always
375    /// held one of the strong refs.
376    ///
377    /// With a `Weak`, the monitor task upgrades each tick. Once the
378    /// last caller-held `Arc` is dropped, the upgrade fails and the
379    /// task exits cleanly. To shut down via `shutdown(self)`, the
380    /// caller must hold the only strong reference: `Arc::try_unwrap`
381    /// on the resulting bus succeeds because the spawned task only
382    /// holds a Weak.
383    pub fn start_scaling_monitor(self: &Arc<Self>) {
384        if self.config.scaling.is_none() {
385            return;
386        }
387
388        // Idempotency check: no-op when a monitor is already
389        // installed. Otherwise a second `start_scaling_monitor`
390        // call would overwrite the slot without aborting the
391        // previous `JoinHandle` — the displaced task would keep
392        // running, holding a `Weak<EventBus>`, only exiting when it
393        // next observed `shutdown` or failed to upgrade. Two
394        // concurrent monitors would briefly compete for
395        // `evaluate_scaling`'s lock, doubling the metrics-tick
396        // wakeup rate.
397        let mut slot = self.scaling_monitor.lock();
398        if slot.is_some() {
399            tracing::debug!("start_scaling_monitor: monitor already running, skipping");
400            return;
401        }
402
403        let weak = Arc::downgrade(self);
404        let handle = tokio::spawn(async move {
405            run_scaling_monitor_via_weak(weak).await;
406        });
407
408        *slot = Some(handle);
409    }
410
411    /// Internal: Add a new shard with its workers.
412    ///
413    /// The previous implementation called `shard_manager.add_shard()`
414    /// first, which atomically marked the shard `Active` and published
415    /// it to the routing table. So `select_shard` could route producer
416    /// pushes to the new id *before* any drain or batch worker existed,
417    /// leaving events queued in a buffer with no consumer (and
418    /// triggering the configured backpressure mode if the buffer
419    /// filled).
420    ///
421    /// The fix uses the two-phase API on `ShardManager`:
422    ///   1. `add_shard()` allocates the id and metrics collector,
423    ///      adds the shard to the routing table in `Provisioning`
424    ///      state — so `with_shard` works (which the drain worker
425    ///      needs) but `select_shard` skips it.
426    ///   2. Spawn batch + drain workers and register the sender.
427    ///   3. `activate_shard()` flips state to `Active`. Only now
428    ///      does `select_shard` start routing producer pushes.
429    async fn add_shard_internal(&self) -> Result<u16, AdapterError> {
430        self.add_shard_internal_with_cooldown_policy(false).await
431    }
432
433    /// Like [`add_shard_internal`] but bypasses the auto-scaling
434    /// cooldown. See [`ShardManager::add_shard_force`].
435    async fn add_shard_internal_force(&self) -> Result<u16, AdapterError> {
436        self.add_shard_internal_with_cooldown_policy(true).await
437    }
438
439    async fn add_shard_internal_with_cooldown_policy(
440        &self,
441        force: bool,
442    ) -> Result<u16, AdapterError> {
443        // Step 1: provisioning add — not yet selectable.
444        let new_id = if force {
445            self.shard_manager.add_shard_force()
446        } else {
447            self.shard_manager.add_shard()
448        }
449        .map_err(|e| AdapterError::Fatal(e.to_string()))?;
450
451        // Step 2: spawn workers and register the sender.
452        let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
453
454        let next_sequence = Arc::new(AtomicU64::new(0));
455
456        let batch = spawn_batch_worker(BatchWorkerParams {
457            shard_id: new_id,
458            rx,
459            adapter: self.adapter.clone(),
460            shard_manager: self.shard_manager.clone(),
461            config: self.config.batch.clone(),
462            adapter_timeout: self.config.adapter_timeout,
463            batch_retries: self.config.adapter_batch_retries,
464            next_sequence: next_sequence.clone(),
465            stats: self.stats.clone(),
466            producer_nonce: self.producer_nonce,
467        });
468
469        // Order of publish here matters. Today `select_shard`
470        // filters on `state == Active` and a shard is still
471        // `Provisioning` at this point, so producers cannot route
472        // to it yet — but the pattern (insert sender → spawn drain
473        // worker → activate_shard) is ordering-fragile: a future
474        // refactor that flips `activate_shard` ahead of the drain-
475        // worker spawn would leave a window where producers can
476        // push but no drain worker is polling. Pin the order by
477        // installing the sender BEFORE the drain worker (existing
478        // behaviour) so the worker observes a fully-published
479        // sender at spawn time; `activate_shard` runs further down.
480        self.batch_senders.write().insert(new_id, tx.clone());
481
482        let drain = spawn_drain_worker_for_shard(
483            new_id,
484            self.shard_manager.clone(),
485            tx,
486            self.shutdown.clone(),
487            self.drain_finalize_ready.clone(),
488        );
489
490        self.batch_workers.lock().insert(
491            new_id,
492            ShardWorkers {
493                batch,
494                drain,
495                next_sequence,
496            },
497        );
498
499        // Step 3: workers are live — flip the shard to Active so
500        // `select_shard` will route to it.
501        //
502        // On `activate_shard` failure we mirror
503        // `remove_shard_internal`'s teardown: drop the sender,
504        // unmap the provisioning entry (which atomically pops any
505        // residual ring-buffer events), gracefully await both
506        // workers (so the drain worker's `scratch` Vec sends its
507        // contents on the channel and the batch worker's
508        // `current_batch` is flushed via the channel-close path),
509        // then dispatch any stranded ring-buffer events through
510        // the adapter. Pre-fix this used `.abort()` on both
511        // handles which dropped the drain
512        // worker's scratch and the batch worker's current_batch
513        // without dispatch.
514        if let Err(e) = self.shard_manager.activate_shard(new_id) {
515            tracing::warn!(
516                shard_id = new_id,
517                error = %e,
518                "activate_shard failed; rolling back provisioning state"
519            );
520
521            // 1. Drop the bus-side sender. The drain worker still
522            //    holds its own clone, so the channel stays open
523            //    until `with_shard` returns None (step 2) and the
524            //    drain worker breaks out of its loop, dropping
525            //    its sender and finally closing the channel.
526            self.batch_senders.write().remove(&new_id);
527
528            // 2. Atomically pop any ring-buffer residue and
529            //    unmap the Provisioning entry. After this,
530            //    `with_shard(new_id)` returns None and the drain
531            //    worker exits at its next poll (after sending
532            //    any events it had already popped into `scratch`
533            //    on this iteration).
534            //
535            //    For a brand-new Provisioning shard the buffer
536            //    should be empty (`select_shard` skips
537            //    non-Active states), so `stranded` is normally
538            //    `Vec::new()`. The flush below is a defensive
539            //    no-op on the happy path but covers any future
540            //    code path that routes to a Provisioning shard
541            //    or any race window that tucked an event in
542            //    before `activate_shard` returned its error.
543            let stranded = self.shard_manager.remove_shard(new_id).unwrap_or_default();
544
545            // 3. Take ownership of the worker handles and await
546            //    them gracefully. Order: drain first (it pumps
547            //    its scratch + final-sweep contents into the
548            //    channel and exits), then batch (which receives
549            //    those events plus any prior channel residue,
550            //    flushes its own `current_batch`, and exits).
551            //
552            //    Awaiting in this order is what makes the drain
553            //    worker's scratch reach the adapter — the
554            //    drain's `Some(N>0)` arm `mem::replace`s scratch
555            //    into a batch and `sender.send(batch).await`s
556            //    it; that send must complete (or fail) before
557            //    the drain worker breaks. The batch worker's
558            //    `Ok(None)` arm then runs after both senders
559            //    are dropped and flushes any pending batch.
560            // Bound each JoinHandle await so a worker that's
561            // parked inside a slow adapter call (e.g. drain worker
562            // mid-`sender.send().await` against a backpressured
563            // channel because the batch worker is itself blocked
564            // in the adapter) cannot pin rollback indefinitely.
565            // 2x `adapter_timeout` is the natural ceiling: the
566            // batch worker uses `adapter_timeout` per dispatch and
567            // is expected to wake within that window. A timeout
568            // here leaks the JoinHandle — acceptable because
569            // step 1 already removed the bus-side sender, so the
570            // detached task can't observe new work and will exit
571            // on its next loop iteration.
572            let workers = self.batch_workers.lock().remove(&new_id);
573            // `worker_detached` is set when either join times out
574            // — the worker is still running on a leaked
575            // JoinHandle. In that case the `next_sequence` atomic
576            // is no longer a reliable upper bound: the detached
577            // worker may publish a final batch whose msg-ids land
578            // in the same `[next_sequence..N]` range we'd use for
579            // the stranded-flush, producing duplicate XADDs or
580            // JetStream dedup hits. Skip the stranded-flush when
581            // detached and surface the loss explicitly so the
582            // operator sees it.
583            let mut worker_detached = false;
584            let final_next_sequence = if let Some(workers) = workers {
585                let bound = self.config.adapter_timeout.saturating_mul(2);
586                match tokio::time::timeout(bound, workers.drain).await {
587                    Ok(Ok(())) => {}
588                    Ok(Err(err)) => {
589                        tracing::warn!(
590                            shard_id = new_id,
591                            error = %err,
592                            "drain worker JoinHandle errored on activate-failure rollback"
593                        );
594                    }
595                    Err(_) => {
596                        tracing::warn!(
597                            shard_id = new_id,
598                            timeout_ms = bound.as_millis() as u64,
599                            "drain worker did not exit within timeout on activate-failure rollback; detaching"
600                        );
601                        worker_detached = true;
602                    }
603                }
604                match tokio::time::timeout(bound, workers.batch).await {
605                    Ok(Ok(())) => {}
606                    Ok(Err(err)) => {
607                        tracing::warn!(
608                            shard_id = new_id,
609                            error = %err,
610                            "BatchWorker JoinHandle errored on activate-failure rollback"
611                        );
612                    }
613                    Err(_) => {
614                        tracing::warn!(
615                            shard_id = new_id,
616                            timeout_ms = bound.as_millis() as u64,
617                            "BatchWorker did not exit within timeout on activate-failure rollback; detaching"
618                        );
619                        worker_detached = true;
620                    }
621                }
622                workers.next_sequence.load(AtomicOrdering::Acquire)
623            } else {
624                0
625            };
626
627            // 4. Dispatch any stranded events as a single-shot
628            //    batch so they reach durable storage with the
629            //    correct sequence-id segment. Identical to the
630            //    `remove_shard_internal` teardown — but only when
631            //    the worker actually exited. If it timed out and
632            //    is still running on a leaked handle, dispatching
633            //    here would emit msg-ids overlapping the worker's
634            //    final flush; surface the events as dropped
635            //    instead so the duplicate-on-the-wire hazard is
636            //    avoided.
637            if !stranded.is_empty() && worker_detached {
638                let count = stranded.len();
639                self.stats
640                    .events_dropped
641                    .fetch_add(count as u64, AtomicOrdering::Relaxed);
642                self.stats
643                    .shutdown_was_lossy
644                    .store(true, AtomicOrdering::Release);
645                tracing::error!(
646                    shard_id = new_id,
647                    count,
648                    "activate-failure rollback: skipping stranded-flush \
649                     because a worker JoinHandle timed out and may still \
650                     be running; events would collide with the detached \
651                     worker's final batch on the wire. Counted as dropped."
652                );
653            } else if !stranded.is_empty() {
654                let count = stranded.len();
655                let batch = crate::event::Batch::with_nonce(
656                    new_id,
657                    stranded,
658                    final_next_sequence,
659                    self.producer_nonce,
660                );
661                let dispatched = dispatch_batch(
662                    &*self.adapter,
663                    batch,
664                    new_id,
665                    self.config.adapter_timeout,
666                    self.config.adapter_batch_retries,
667                )
668                .await;
669                if dispatched {
670                    self.stats
671                        .batches_dispatched
672                        .fetch_add(1, AtomicOrdering::Relaxed);
673                    self.stats
674                        .events_dispatched
675                        .fetch_add(count as u64, AtomicOrdering::Relaxed);
676                    tracing::info!(
677                        shard_id = new_id,
678                        count,
679                        "activate-failure rollback: flushed stranded events to adapter",
680                    );
681                } else {
682                    tracing::error!(
683                        shard_id = new_id,
684                        count,
685                        "activate-failure rollback: adapter rejected stranded events; \
686                         events lost"
687                    );
688                }
689            }
690
691            return Err(AdapterError::Fatal(e.to_string()));
692        }
693
694        // Update poll merger with the post-add id set. Hold
695        // `poll_merger_swap_lock` across the snapshot-and-store so a
696        // concurrent remove_shard can't sneak between our `shard_ids()`
697        // read and our `arc_swap.store` and clobber the published view
698        // with a stale snapshot.
699        {
700            let _swap_guard = self.poll_merger_swap_lock.lock();
701            self.poll_merger.store(Arc::new(PollMerger::new(
702                self.adapter.clone(),
703                self.shard_manager.shard_ids(),
704            )));
705        }
706
707        tracing::info!(shard_id = new_id, "Added new shard");
708        Ok(new_id)
709    }
710
711    /// Internal: Remove a stopped shard.
712    ///
713    /// Previously this dropped the worker `JoinHandle`s and unmapped
714    /// the shard without first draining its ring buffer. Any events
715    /// still queued at the moment of removal — even just a few from a
716    /// producer that pushed concurrently with the scale-down decision
717    /// — were silently stranded once the drain worker exited on
718    /// `with_shard → None`.
719    ///
720    /// The fix:
721    ///   1. Wait for the drain worker we're about to retire to flush
722    ///      the channel, by closing the bus-side sender first.
723    ///   2. Call `remove_shard`, which atomically pops the
724    ///      ring-buffer remnants and unmaps the shard. The drained
725    ///      events come back to us as a `Vec`.
726    ///   3. Hand those events directly to the adapter as a
727    ///      single-shot batch — bypassing the per-shard pipeline
728    ///      that's already being torn down — so they reach durable
729    ///      storage.
730    async fn remove_shard_internal(&self, shard_id: u16) -> Result<(), AdapterError> {
731        // Step 1: drop the bus-side sender. The drain worker still
732        // has its own clone and will keep draining; we want it to
733        // exit when its `with_shard` call returns `None` after
734        // step 2's unmap.
735        self.batch_senders.write().remove(&shard_id);
736
737        // Step 2: atomically drain whatever's in the ring buffer and
738        // unmap. After this, `with_shard(shard_id)` returns `None`
739        // and the drain worker exits at its next poll.
740        let stranded = self
741            .shard_manager
742            .remove_shard(shard_id)
743            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
744
745        // Step 3: take ownership of the worker handles (move them
746        // OUT of the mutex map so we can `await` them — `await`
747        // consumes a `JoinHandle`). With the bus-side sender already
748        // dropped (step 1) and the shard unmapped (step 2), the
749        // drain worker exits at its next poll and drops its sender
750        // clone, which closes the BatchWorker's `rx`. The
751        // BatchWorker then flushes any pending `current_batch` and
752        // any events still buffered in the channel, dispatches them
753        // via the standard `dispatch_batch` path with their PROPER
754        // `next_sequence` values, and exits.
755        //
756        // Await order: drain first, then batch. The drain worker's
757        // `Some(N>0)` arm `mem::replace`s scratch into a batch and
758        // `sender.send(batch).await`s it; that send must complete
759        // (or fail) before drain breaks. The batch worker's
760        // `Ok(None)` arm runs after the sender drops and flushes
761        // any pending batch. Awaiting in the reverse order would
762        // park here forever: the batch worker's `recv()` only
763        // returns `None` once every sender clone (including the
764        // drain worker's) has dropped.
765        //
766        // Both awaits are bounded by `2 × adapter_timeout` so a
767        // worker parked inside a slow adapter call cannot pin
768        // teardown indefinitely. A timeout leaks the JoinHandle —
769        // acceptable because step 1 already removed the bus-side
770        // sender and step 2 unmapped the shard, so the detached
771        // task can't observe new work and will exit on its next
772        // loop iteration.
773        let workers = self.batch_workers.lock().remove(&shard_id);
774        let final_next_sequence = if let Some(workers) = workers {
775            let bound = self.config.adapter_timeout.saturating_mul(2);
776            match tokio::time::timeout(bound, workers.drain).await {
777                Ok(Ok(())) => {}
778                Ok(Err(e)) => {
779                    tracing::warn!(
780                        shard_id,
781                        error = %e,
782                        "drain worker JoinHandle errored on await; \
783                         drain worker should have already exited via \
784                         `with_shard -> None`",
785                    );
786                }
787                Err(_) => {
788                    tracing::warn!(
789                        shard_id,
790                        timeout_ms = bound.as_millis() as u64,
791                        "drain worker did not exit within timeout on remove_shard; detaching",
792                    );
793                }
794            }
795            match tokio::time::timeout(bound, workers.batch).await {
796                Ok(Ok(())) => {}
797                Ok(Err(e)) => {
798                    tracing::warn!(
799                        shard_id,
800                        error = %e,
801                        "BatchWorker JoinHandle errored on await; \
802                         proceeding with stranded-flush using last \
803                         published next_sequence",
804                    );
805                }
806                Err(_) => {
807                    tracing::warn!(
808                        shard_id,
809                        timeout_ms = bound.as_millis() as u64,
810                        "BatchWorker did not exit within timeout on remove_shard; detaching",
811                    );
812                }
813            }
814            workers.next_sequence.load(AtomicOrdering::Acquire)
815        } else {
816            // No worker registered for this shard — shouldn't
817            // happen on the normal scale-down path, but defensively
818            // fall back to 0. This branch only activates if a
819            // caller manages to call `remove_shard_internal` for a
820            // shard that was never spawned (or already removed).
821            0
822        };
823
824        // Step 4: flush the stranded ring-buffer events through the
825        // adapter in one shot, using `final_next_sequence` (NOT 0)
826        // as the `sequence_start`. The stranded batch's msg-ids
827        // are `{nonce}:{shard_id}:{final_next_sequence}:{i}` —
828        // strictly past every msg-id the worker emitted. Using 0
829        // would collide with the worker's very first batch
830        // (`{nonce}:{shard_id}:0:{i}`), and JetStream's 2 min dedup
831        // window would silently drop the duplicate.
832        if !stranded.is_empty() {
833            let count = stranded.len();
834            // Use the bus's loaded producer nonce so the stranded
835            // batch's msg-ids share the same producer-identity
836            // segment as everything else this bus has emitted —
837            // critical for cross-process dedup.
838            let batch = crate::event::Batch::with_nonce(
839                shard_id,
840                stranded,
841                final_next_sequence,
842                self.producer_nonce,
843            );
844            let dispatched = dispatch_batch(
845                &*self.adapter,
846                batch,
847                shard_id,
848                self.config.adapter_timeout,
849                self.config.adapter_batch_retries,
850            )
851            .await;
852            if dispatched {
853                self.stats
854                    .batches_dispatched
855                    .fetch_add(1, AtomicOrdering::Relaxed);
856                self.stats
857                    .events_dispatched
858                    .fetch_add(count as u64, AtomicOrdering::Relaxed);
859                tracing::info!(
860                    shard_id,
861                    count,
862                    sequence_start = final_next_sequence,
863                    "Removed shard: flushed stranded ring-buffer events to adapter"
864                );
865            } else {
866                tracing::error!(
867                    shard_id,
868                    count,
869                    sequence_start = final_next_sequence,
870                    "Removed shard: adapter rejected stranded ring-buffer events; \
871                     events lost"
872                );
873            }
874        }
875
876        // Update poll merger with the post-remove id set.
877        // Without this, a default-shards poll (`request.shards == None`)
878        // would still iterate `0..num_shards` and skip the live shard
879        // whose id is now the largest, while polling a nonexistent /
880        // recreated shard at the bottom of the range.
881        //
882        // `poll_merger_swap_lock` serializes against
883        // `add_shard_internal`'s matching block — see the field doc.
884        {
885            let _swap_guard = self.poll_merger_swap_lock.lock();
886            self.poll_merger.store(Arc::new(PollMerger::new(
887                self.adapter.clone(),
888                self.shard_manager.shard_ids(),
889            )));
890        }
891
892        tracing::info!(shard_id = shard_id, "Removed shard");
893        Ok(())
894    }
895
896    /// Try to enter an ingest critical section. Returns `None` if
897    /// shutdown is in progress, in which case the caller must
898    /// return `IngestionError::ShuttingDown` without touching the
899    /// shard manager.
900    ///
901    /// The `fetch_add` + load(`shutdown`) sequence pairs with
902    /// `shutdown()`'s store(`shutdown=true`) + wait-for-zero on
903    /// `in_flight_ingests`. SeqCst on both sides closes the
904    /// stranding race: every ingest that is observed as in-flight
905    /// during shutdown's wait is guaranteed to complete before the
906    /// drain workers do their final ring-buffer sweep, so no event
907    /// can land in a ring buffer after the drain worker has stopped
908    /// reading from it.
909    #[inline(always)]
910    fn try_enter_ingest(&self) -> Option<IngestGuard<'_>> {
911        self.in_flight_ingests.fetch_add(1, AtomicOrdering::SeqCst);
912        if self.shutdown.load(AtomicOrdering::SeqCst) {
913            self.in_flight_ingests.fetch_sub(1, AtomicOrdering::SeqCst);
914            return None;
915        }
916        Some(IngestGuard { bus: self })
917    }
918
919    /// Ingest an event.
920    ///
921    /// This is a non-blocking operation. The event is added to the appropriate
922    /// shard's ring buffer and will be batched and persisted asynchronously.
923    ///
924    /// # Returns
925    ///
926    /// The shard ID and insertion timestamp on success.
927    #[inline]
928    pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)> {
929        let _g = self
930            .try_enter_ingest()
931            .ok_or(IngestionError::ShuttingDown)?;
932
933        match self.shard_manager.ingest(event.into_inner()) {
934            Ok((shard_id, ts)) => {
935                self.stats
936                    .events_ingested
937                    .fetch_add(1, AtomicOrdering::Relaxed);
938                Ok((shard_id, ts))
939            }
940            Err(e) => {
941                self.stats
942                    .events_dropped
943                    .fetch_add(1, AtomicOrdering::Relaxed);
944                Err(e)
945            }
946        }
947    }
948
949    /// Ingest a raw event (pre-serialized with cached hash).
950    ///
951    /// This is the fastest ingestion path:
952    /// - Uses pre-computed hash for shard selection (no serialization)
953    /// - Stores bytes directly (no clone needed, reference-counted)
954    ///
955    /// # Returns
956    ///
957    /// The shard ID and insertion timestamp on success.
958    #[inline]
959    pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)> {
960        let _g = self
961            .try_enter_ingest()
962            .ok_or(IngestionError::ShuttingDown)?;
963
964        match self.shard_manager.ingest_raw(event) {
965            Ok((shard_id, ts)) => {
966                self.stats
967                    .events_ingested
968                    .fetch_add(1, AtomicOrdering::Relaxed);
969                Ok((shard_id, ts))
970            }
971            Err(e) => {
972                self.stats
973                    .events_dropped
974                    .fetch_add(1, AtomicOrdering::Relaxed);
975                Err(e)
976            }
977        }
978    }
979
980    /// Ingest a batch of events.
981    ///
982    /// This is more efficient than calling `ingest` repeatedly: events
983    /// destined for the same shard share a single mutex acquisition.
984    ///
985    /// # Returns
986    ///
987    /// The number of successfully ingested events.
988    pub fn ingest_batch(&self, events: Vec<Event>) -> usize {
989        // The shutdown gate lives in `ingest_raw_batch`, which we
990        // forward to. No separate guard here — that would double-
991        // count `in_flight_ingests` (once for this call, once for
992        // the inner call) and could deadlock shutdown under high
993        // contention.
994        let raw: Vec<RawEvent> = events.into_iter().map(|e| e.into_raw()).collect();
995        self.ingest_raw_batch(raw)
996    }
997
998    /// Ingest a batch of raw events (fastest batch ingestion).
999    ///
1000    /// Groups events by their destination shard and pushes each group
1001    /// under a single mutex acquisition.
1002    ///
1003    /// # Returns
1004    ///
1005    /// The number of successfully ingested events.
1006    pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize {
1007        let _g = match self.try_enter_ingest() {
1008            Some(g) => g,
1009            None => return 0,
1010        };
1011
1012        let total = events.len();
1013        let (success, unrouted) = self.shard_manager.ingest_raw_batch(events);
1014        if success > 0 {
1015            self.stats
1016                .events_ingested
1017                .fetch_add(success as u64, AtomicOrdering::Relaxed);
1018        }
1019        // Subtract `unrouted` from the buffer-fullness drop count
1020        // so the same event isn't tallied in both `events_unrouted`
1021        // (bumped inside `ShardManager::ingest_raw_batch`) and
1022        // `events_dropped` — using a plain `total - success` here
1023        // would double-count unrouted events. Backpressure drops
1024        // = events that reached a shard but failed to push.
1025        let dropped = total.saturating_sub(success).saturating_sub(unrouted);
1026        if dropped > 0 {
1027            self.stats
1028                .events_dropped
1029                .fetch_add(dropped as u64, AtomicOrdering::Relaxed);
1030        }
1031        success
1032    }
1033
1034    /// Poll events from the bus.
1035    ///
1036    /// This retrieves events from storage according to the request parameters.
1037    ///
1038    /// # Topology-change visibility
1039    ///
1040    /// `ArcSwap::load()` snapshots the current `PollMerger` for
1041    /// the duration of this call. A concurrent `add_shard` /
1042    /// `remove_shard_internal` that `.store()`s a fresh merger
1043    /// only affects **subsequent** polls — this poll continues
1044    /// against the loaded snapshot's shard list.
1045    ///
1046    /// The implications:
1047    ///   - **add_shard mid-poll:** events ingested into the new
1048    ///     shard between the merger swap and our return are
1049    ///     invisible to this call. They appear on the next poll.
1050    ///   - **remove_shard_internal mid-poll:** the stale merger
1051    ///     still has the removed shard in its id list. Adapters
1052    ///     that lazy-create streams on `poll_shard` (JetStream
1053    ///     in particular) may recreate the stream and return
1054    ///     empty/stale data. The drained events are dispatched
1055    ///     to durable storage by `remove_shard_internal` itself
1056    ///     before this poll's adapter call lands; the next poll
1057    ///     loads the new merger and sees the correct shard set.
1058    ///
1059    /// In both cases the loss is transient and self-healing:
1060    /// pagination via `next_id` and the next poll's cursor pick
1061    /// up where we left off. Callers requiring strict
1062    /// "topology-stable" semantics should serialize their polls
1063    /// against scaling operations externally.
1064    pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
1065        let merger = self.poll_merger.load();
1066        merger.poll(request).await
1067    }
1068
1069    /// Get the number of shards.
1070    pub fn num_shards(&self) -> u16 {
1071        self.shard_manager.num_shards()
1072    }
1073
1074    /// Get the adapter name.
1075    pub fn adapter_name(&self) -> &'static str {
1076        self.adapter.name()
1077    }
1078
1079    /// Check if the adapter is healthy.
1080    pub async fn is_healthy(&self) -> bool {
1081        self.adapter.is_healthy().await
1082    }
1083
1084    /// Get statistics.
1085    pub fn stats(&self) -> &EventBusStats {
1086        &self.stats
1087    }
1088
1089    /// Get shard statistics.
1090    pub fn shard_stats(&self) -> crate::shard::ShardStats {
1091        self.shard_manager.stats()
1092    }
1093
1094    /// Sum of `len()` across every shard's ring buffer.
1095    ///
1096    /// Mainly useful in tests and operational diagnostics: a
1097    /// non-zero value at the time of `Drop` (without an awaited
1098    /// `shutdown()`) would be silently lost, so `Drop` folds this
1099    /// into `events_dropped` before the bus disappears.
1100    pub fn pending_in_rings(&self) -> u64 {
1101        self.shard_manager.total_pending_in_rings()
1102    }
1103
1104    /// Flush all pending batches.
1105    ///
1106    /// Waits for all shard ring buffers to drain, then for the
1107    /// per-shard mpsc channels to drain, then for any pending batch
1108    /// inside each batch worker to time out and dispatch — and only
1109    /// then calls `adapter.flush()`.
1110    ///
1111    /// # Latency bound
1112    ///
1113    /// The total wall-clock budget is the sum of three phases:
1114    ///   * Phase 1 (ring-buffer drain): up to **5 s**.
1115    ///   * Phase 2 (channel + pending-batch drain): up to
1116    ///     `min(2 s, batch.max_delay × n_workers)` — capped at 2 s
1117    ///     so a misconfigured `max_delay` cannot inflate the budget.
1118    ///   * Phase 3 (`adapter.flush()` call): up to `adapter_timeout`
1119    ///     (default **30 s**).
1120    ///
1121    /// Worst-case `flush()` runtime is therefore **~37 s under
1122    /// default config**, NOT 5 s as an earlier doc-comment stated.
1123    /// Callers wiring `flush()` into request-path latencies (HTTP
1124    /// handler, RPC) MUST set `adapter_timeout` accordingly or run
1125    /// the flush under their own outer timeout. The 5-second figure
1126    /// describes Phase 1 only; the doc was misleading and is fixed
1127    /// here.
1128    ///
1129    /// The previous implementation slept a single `batch.max_delay`
1130    /// (default 10 ms) after the ring buffers drained and immediately
1131    /// called `adapter.flush()`. Events still in transit through the
1132    /// per-shard mpsc channel, the batch worker's pending batch, or
1133    /// the in-progress `adapter.on_batch` call (bounded only by
1134    /// `adapter_timeout`, default 30 s) could miss the flush. Callers
1135    /// using `flush()` as a delivery barrier silently lost events.
1136    pub async fn flush(&self) -> Result<(), AdapterError> {
1137        let start = tokio::time::Instant::now();
1138        let timeout = Duration::from_secs(5);
1139        let mut backoff = Duration::from_micros(100);
1140
1141        // Phase 1: wait for ring buffers to drain (drain workers
1142        // pump them into the per-shard mpsc channels).
1143        loop {
1144            if self.shard_manager.all_shards_empty() {
1145                break;
1146            }
1147            if start.elapsed() >= timeout {
1148                tracing::warn!("flush: ring buffers not fully drained after {:?}", timeout);
1149                break;
1150            }
1151            tokio::time::sleep(backoff).await;
1152            backoff = (backoff * 2).min(Duration::from_millis(10));
1153        }
1154
1155        // Phase 2: wait until every event ingested before flush()
1156        // started has been handed to the adapter via `on_batch`.
1157        // Snapshot the `events_ingested` counter at flush entry —
1158        // that's our target. We then poll `events_dispatched` (sum
1159        // of batch lengths from successful adapter dispatches) plus
1160        // `events_dropped` (events the adapter rejected after retry
1161        // exhaustion or that never made it past backpressure). The
1162        // barrier is met when `events_dispatched + events_dropped >=
1163        // target`: every pre-flush ingest is accounted for in one
1164        // bucket or the other.
1165        //
1166        // This is a true delivery barrier — it doesn't rely on
1167        // "no progress this window" heuristics that race a
1168        // BatchWorker whose `batch_start` was set just before
1169        // flush() ran. A progress gate that reads
1170        // `batches_dispatched` only works if that counter is
1171        // actually incremented on every dispatch, and Windows
1172        // timer resolution alone has historically made any
1173        // single-`max_delay`-sleep approach a frequent flake.
1174        let target_ingested = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1175        let dropped_at_start = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1176        let dispatched_at_start = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1177
1178        // Outer deadline still bounds Phase 2 in case a wedged
1179        // adapter never returns. `max_delay * num_workers` is the
1180        // worst-case shape (one partially-filled batch per worker,
1181        // each waiting its full `max_delay` to time out), capped at
1182        // 2 s — same upper bound as before.
1183        //
1184        // Read the worker count via the shard manager's atomic-
1185        // backed `num_shards()` rather than `batch_workers.lock()
1186        // .len()`. The previous spinlock-backed `.lock()` inside
1187        // an `async fn` could stall the runtime worker thread
1188        // under contention with concurrent `add_shard_internal` /
1189        // `remove_shard_internal` callers; the atomic accessor is
1190        // both faster and async-safe. Mismatch in the
1191        // worker-count vs shard-count snapshot only changes the
1192        // phase2 deadline by at most one `max_delay` step, which
1193        // is bounded by the outer 2s cap regardless.
1194        let n_workers = usize::from(self.shard_manager.num_shards());
1195        let phase2_budget = self
1196            .config
1197            .batch
1198            .max_delay
1199            .saturating_mul(n_workers.max(1) as u32);
1200        let phase2_deadline =
1201            tokio::time::Instant::now() + phase2_budget.min(Duration::from_secs(2));
1202
1203        // Inner poll cadence: re-check the counters every 1 ms (or
1204        // `max_delay / 16`, whichever is larger). The fast cadence
1205        // means we exit promptly once the BatchWorker dispatches,
1206        // rather than waking exactly once per `max_delay` and
1207        // potentially racing the dispatch by a few ms.
1208        let poll_interval = (self.config.batch.max_delay / 16).max(Duration::from_millis(1));
1209        loop {
1210            let dispatched = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1211            let dropped = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1212            // The barrier: every event ingested pre-flush has been
1213            // either dispatched or dropped. The bus's invariant
1214            // `events_ingested = events_dispatched + events_dropped`
1215            // holds at quiescence; we wait until
1216            // `dispatched + dropped >= target_ingested`.
1217            //
1218            // Pre-fix this used `dispatched + (dropped -
1219            // dropped_at_start) >= target_ingested`, which under-
1220            // counted by `dropped_at_start`: even after every
1221            // pre-flush event was processed, the inequality
1222            // required `dropped_at_start` MORE post-flush events
1223            // before signalling done. Workloads with no post-flush
1224            // ingest hung at the barrier until the deadline fired.
1225            //
1226            // Cross-shard race remains: a fast shard's post-flush
1227            // dispatches can satisfy the global target while a
1228            // slow shard's pre-flush events linger in its mpsc
1229            // channel or pending batch. `all_shards_empty()`
1230            // checks ring buffers but not those downstream
1231            // queues. Operators relying on flush as a hard
1232            // delivery barrier should call it during quiet
1233            // ingest, or in `shutdown` (which gates ingest via
1234            // `try_enter_ingest`).
1235            let _ = dispatched_at_start; // reserved for future per-shard accounting
1236            if dispatched.saturating_add(dropped) >= target_ingested
1237                && self.shard_manager.all_shards_empty()
1238            {
1239                break;
1240            }
1241            if tokio::time::Instant::now() >= phase2_deadline {
1242                tracing::warn!(
1243                    target = target_ingested,
1244                    dispatched,
1245                    dropped = dropped.saturating_sub(dropped_at_start),
1246                    "flush: Phase 2 deadline reached before all ingested events were dispatched",
1247                );
1248                break;
1249            }
1250            tokio::time::sleep(poll_interval).await;
1251        }
1252
1253        // Phase 3: tell the adapter to flush whatever it has
1254        // buffered. Bounded by `adapter_timeout` so a hanging
1255        // adapter can't pin us forever.
1256        match tokio::time::timeout(self.config.adapter_timeout, self.adapter.flush()).await {
1257            Ok(r) => r,
1258            Err(_) => {
1259                tracing::warn!(
1260                    "flush: adapter.flush timed out after {:?}",
1261                    self.config.adapter_timeout
1262                );
1263                Err(AdapterError::Fatal("adapter flush timed out".into()))
1264            }
1265        }
1266    }
1267
1268    /// Gracefully shut down the event bus.
1269    ///
1270    /// The shutdown order is load-bearing:
1271    ///
1272    ///   1. Signal `shutdown` so drain workers stop pulling from
1273    ///      ring buffers after their final sweep.
1274    ///   2. Await **drain workers** so every event the producer
1275    ///      has handed to the bus is now in the per-shard mpsc
1276    ///      channel.
1277    ///   3. Drop `batch_senders` so each channel's last sender is
1278    ///      gone — the next `recv().await` in a batch worker will
1279    ///      return `None`.
1280    ///   4. Await **batch workers**, which drain everything
1281    ///      remaining in their channel and exit on `recv() = None`.
1282    ///
1283    /// Reversing steps 2 and 4 (the previous design) silently
1284    /// dropped events: a batch worker that exited on the shutdown
1285    /// flag could leave events the drain worker pushed *after* its
1286    /// `try_recv` sweep stranded in the channel.
1287    pub async fn shutdown(self) -> Result<(), AdapterError> {
1288        self.shutdown_via_ref().await
1289    }
1290
1291    /// Shutdown via shared reference — same semantics as
1292    /// [`shutdown`](Self::shutdown), but does not consume `self`.
1293    ///
1294    /// Useful for callers that hold the bus behind `Arc<EventBus>`
1295    /// (e.g., the SDK, where `subscribe` perpetuates an Arc clone
1296    /// into every `EventStream`) and therefore cannot satisfy
1297    /// `Arc::try_unwrap`. Idempotent: the first caller does the
1298    /// work; concurrent or subsequent callers wait for the
1299    /// `shutdown_completed` flag and return `Ok(())`.
1300    pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError> {
1301        // 1. CAS the shutdown flag false→true. SeqCst pairs with
1302        // `try_enter_ingest`'s shutdown check — any producer that
1303        // observed the previous `false` and is mid-push has its
1304        // `in_flight_ingests` increment ordered before this store
1305        // (the CAS-success branch is a release of the new `true`),
1306        // and so will be visible to the wait below.
1307        //
1308        // If the CAS loses (someone else — typically a concurrent
1309        // call or `Drop` — already flipped the flag), spin until
1310        // they finish. We can't run the rest of the body because
1311        // workers/senders may already be partially torn down.
1312        if self
1313            .shutdown
1314            .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
1315            .is_err()
1316        {
1317            // Bound the wait so a `Drop`-only path (which sets
1318            // `shutdown=true` but never sets `shutdown_completed`)
1319            // doesn't spin forever.
1320            //
1321            // Distinguish the two outcomes for callers. If
1322            // `shutdown_completed` flips inside the window, we
1323            // return `Ok(())` and the caller can be sure the first
1324            // caller finished. If the deadline fires first, we
1325            // surface `AdapterError::Transient(_)` — the bus IS
1326            // being shut down (the flag is set), but completion is
1327            // not yet observable; the caller can treat this as
1328            // "another thread is working on it, retry the
1329            // is_shutdown_completed() poll if you need a hard
1330            // barrier."
1331            //
1332            // Returning `Ok(())` in both branches would let
1333            // shutdown-done assumptions silently drift under a slow
1334            // adapter (`adapter_timeout` default 30 s > the 10 s
1335            // spin deadline), letting subsequent code observe a
1336            // partially-shut-down bus.
1337            // 10s in production builds; overridable via the
1338            // `_TEST_OVERRIDE_SHUTDOWN_VIA_REF_DEADLINE` thread-local
1339            // in test builds so the slow-first-caller test doesn't
1340            // need to wall-clock-wait for the full deadline. The
1341            // override is `#[cfg(test)]`-only; production cargo
1342            // builds compile out the override entirely.
1343            let deadline_dur = shutdown_via_ref_spin_deadline();
1344            // Use `tokio::time::Instant` so tests using
1345            // `tokio::time::pause()` virtualize this clock too.
1346            // Pre-fix `std::time::Instant` was wall-clock and
1347            // ignored `pause()`, breaking timeout-bounded tests
1348            // that wanted to fast-forward the spin deadline.
1349            let deadline = tokio::time::Instant::now() + deadline_dur;
1350            while !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1351                if tokio::time::Instant::now() >= deadline {
1352                    return Err(AdapterError::Transient(
1353                        "shutdown_via_ref: another caller is mid-shutdown; \
1354                         deadline elapsed before shutdown_completed \
1355                         flipped. The bus IS shutting down; poll \
1356                         is_shutdown_completed() if you need a hard \
1357                         barrier."
1358                            .into(),
1359                    ));
1360                }
1361                // `yield_now` re-queues immediately and keeps the
1362                // task hot, starving the workers we're waiting on
1363                // under contention. A short `sleep` parks the task
1364                // and lets the runtime schedule the workers.
1365                tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1366            }
1367            return Ok(());
1368        }
1369
1370        // 1a. Wait for in-flight ingests to drain BEFORE the drain
1371        // workers do their final ring-buffer sweep. Otherwise a
1372        // producer that observed `shutdown=false` could push *after*
1373        // the drain worker's last `pop_batch_into` returned zero,
1374        // leaving the event stranded in the ring buffer when the bus
1375        // is dropped.
1376        //
1377        // This is bounded: every producer either bails on the
1378        // SeqCst-synchronized shutdown check (no progress past the
1379        // increment) or completes its single non-blocking push and
1380        // decrements. Both paths take constant time; the total
1381        // wait is O(producer threads).
1382        //
1383        // The "every observed in-flight ingest completes before
1384        // the final sweep" property holds under normal conditions,
1385        // but the 5-second deadline below forces the gate open
1386        // even when producers are still in their push window. A
1387        // producer that has incremented `in_flight_ingests` (and
1388        // so observed `shutdown=false`) but whose push is delayed
1389        // past the deadline (heavy contention, debugger hit, etc.)
1390        // will complete its push AFTER the final sweep — its event
1391        // lands in the ring buffer and is never read. The deadline
1392        // exists so a stuck producer can't deadlock shutdown
1393        // indefinitely; the trade-off is documented data loss past
1394        // the 5 s window, surfaced via the `events_dropped` stat
1395        // (so the loss is observable to operators) and the `WARN`
1396        // log below (so it's diagnosable). The "no stranding"
1397        // promise on the happy path stands; the deadline path is
1398        // the documented escape hatch.
1399        // Use `tokio::time::Instant` so tests using
1400        // `tokio::time::pause()` virtualize this 5-second
1401        // deadline too — pre-fix the `std::time::Instant`
1402        // was wall-clock and ignored the test's paused clock.
1403        let in_flight_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1404        // Snapshot of `(stranded, ingested, dispatched)` at the
1405        // deadline — Some(...) iff we hit the deadline path. The
1406        // post-drain reconciliation reads this to compute the
1407        // actual drop count (see comment further down).
1408        let mut deadline_snapshot: Option<(u64, u64, u64)> = None;
1409        while self.in_flight_ingests.load(AtomicOrdering::SeqCst) > 0 {
1410            if tokio::time::Instant::now() >= in_flight_deadline {
1411                let stranded = self.in_flight_ingests.load(AtomicOrdering::SeqCst);
1412                let ingested_now = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1413                let dispatched_now = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1414                tracing::warn!(
1415                    in_flight = stranded,
1416                    lossy = true,
1417                    "shutdown timed out waiting for in-flight ingests after 5s; \
1418                     proceeding — up to {} events may strand in the ring buffer \
1419                     past final drain (documented data-loss path)",
1420                    stranded,
1421                );
1422                // Set the lossy flag immediately so a fast `is_*`
1423                // poll observes the outcome before the drain
1424                // finishes. The actual `events_dropped` bump is
1425                // deferred until after the final drain runs (see
1426                // "post-drain reconciliation" below) so we don't
1427                // double-count events that the drain still
1428                // successfully delivers — pre-fix this bumped
1429                // `events_dropped += stranded` here and the same
1430                // events that the final sweep then drained landed
1431                // in BOTH `events_ingested` and `events_dropped`,
1432                // breaking the bus's
1433                // `ingested == dispatched + dropped` invariant
1434                // and turning `shutdown_was_lossy` into a false
1435                // positive on every deadline-triggered shutdown.
1436                self.stats
1437                    .shutdown_was_lossy
1438                    .store(true, AtomicOrdering::Release);
1439                deadline_snapshot = Some((stranded, ingested_now, dispatched_now));
1440                break;
1441            }
1442            // Park instead of `yield_now`. The producers we're
1443            // waiting on contend for the same runtime threads;
1444            // re-queuing immediately starves their progress.
1445            tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1446        }
1447
1448        // 1b. Release the drain-finalize gate.
1449        //
1450        // Pre-fix used `Ordering::Release` for this store
1451        // and relied on the SeqCst spin above (loading
1452        // `in_flight_ingests`) to provide the happens-before for
1453        // every observed-pre-shutdown push. That works today
1454        // because SeqCst loads carry an implicit fence, but a
1455        // future change to the spin's ordering (Relaxed for perf,
1456        // say) would silently break the drain worker's final-sweep
1457        // contract — producer pushes might not be visible to
1458        // `pop_batch_into`. Promote to SeqCst so the load-bearing
1459        // happens-before is explicit at this site, not derived
1460        // from another atomic's ordering choice.
1461        //
1462        // Caveat: the SeqCst happens-before above only covers the
1463        // *non-deadline* exit from the spin (the loop condition
1464        // observed `in_flight_ingests == 0`). On the deadline-break
1465        // path the loop exits with `in_flight_ingests > 0` — the
1466        // outstanding producer pushes have NOT been observed
1467        // synchronized-with this thread, and the SeqCst store below
1468        // does not retroactively create a happens-before with
1469        // pushes that are still mid-flight. Those events are
1470        // exactly the "stranded" events accounted via
1471        // `events_dropped` and `shutdown_was_lossy`; the contract
1472        // is "every observed-pre-shutdown push is visible to the
1473        // final sweep on the happy path; on the deadline path,
1474        // up to `stranded` events past the gate are surfaced as
1475        // dropped". The flag at line 1271 (`shutdown_was_lossy`)
1476        // is the operator-visible signal that this contract was
1477        // exercised on the lossy branch.
1478        self.drain_finalize_ready
1479            .store(true, AtomicOrdering::SeqCst);
1480
1481        // Stop the scaling monitor first — it's independent of the
1482        // ingestion path and just needs to observe the flag.
1483        let scaling_handle = self.scaling_monitor.lock().take();
1484        if let Some(handle) = scaling_handle {
1485            let _ = handle.await;
1486        }
1487
1488        // Take workers without holding the lock across await.
1489        let workers = std::mem::take(&mut *self.batch_workers.lock());
1490
1491        // 2. Await drain workers. Each one pops a final batch (up
1492        //    to 10k events) from its ring buffer, sends it on the
1493        //    channel, and exits. After this loop, every event in
1494        //    the ring buffers has been pushed to its channel.
1495        //
1496        // `join_all` lets the runtime overlap drain handles. A
1497        // sequential `for ... { drain.await; }` would serialize
1498        // shutdown wall-clock as N×T instead of max(T), which on
1499        // the default 1024-shard config × per-shard final-drain
1500        // time makes shutdown painful.
1501        let (drains, batch_handles): (Vec<_>, Vec<_>) = workers
1502            .into_iter()
1503            .map(|(shard_id, ShardWorkers { batch, drain, .. })| {
1504                ((shard_id, drain), (shard_id, batch))
1505            })
1506            .unzip();
1507
1508        // Surface drain-worker JoinErrors explicitly. The default
1509        // Tokio runtime does NOT log spawned-task panics, so a
1510        // `let _ = join_all(...)` would silently swallow a panic
1511        // and mask stranded events. tracing::error per failure
1512        // makes the incident grep-able post-mortem.
1513        let drain_handles: Vec<_> = drains.into_iter().map(|(_, h)| h).collect();
1514        let drain_ids: Vec<u16> = batch_handles.iter().map(|(id, _)| *id).collect();
1515        for (shard_id, result) in drain_ids
1516            .iter()
1517            .copied()
1518            .zip(futures::future::join_all(drain_handles).await)
1519        {
1520            if let Err(e) = result {
1521                tracing::error!(
1522                    shard_id,
1523                    error = %e,
1524                    "drain worker JoinHandle errored on shutdown await"
1525                );
1526            }
1527        }
1528
1529        // 3. Drop the original senders so the channels close once
1530        //    drain-worker sender clones (already dropped above)
1531        //    are gone too. Without this, batch workers would block
1532        //    on `recv().await` forever.
1533        drop(std::mem::take(&mut *self.batch_senders.write()));
1534
1535        // 4. Await batch workers. They drain their channel until
1536        //    `recv() = None`, flush, and exit.
1537        //
1538        // Same parallelization as the drain phase, with the same
1539        // explicit JoinError surfacing.
1540        let batch_only: Vec<_> = batch_handles.into_iter().map(|(_, h)| h).collect();
1541        for (shard_id, result) in drain_ids
1542            .into_iter()
1543            .zip(futures::future::join_all(batch_only).await)
1544        {
1545            if let Err(e) = result {
1546                tracing::error!(
1547                    shard_id,
1548                    error = %e,
1549                    "BatchWorker JoinHandle errored on shutdown await"
1550                );
1551            }
1552        }
1553
1554        // Flush and shutdown adapter (with timeout to prevent hanging)
1555        let timeout = self.config.adapter_timeout;
1556        if tokio::time::timeout(timeout, self.adapter.flush())
1557            .await
1558            .is_err()
1559        {
1560            tracing::error!("Adapter flush timed out during shutdown");
1561        }
1562        let result = tokio::time::timeout(timeout, self.adapter.shutdown())
1563            .await
1564            .map_err(|_| AdapterError::Fatal("adapter shutdown timed out".into()))?;
1565
1566        // Post-drain reconciliation for the lossy-shutdown path.
1567        //
1568        // If we hit the in-flight deadline above, `deadline_snapshot`
1569        // holds `(stranded, ingested@deadline, dispatched@deadline)`.
1570        // Some of those `stranded` producers' events landed in the
1571        // ring AFTER our deadline check but BEFORE the
1572        // `drain_finalize_ready` gate flipped — those events are
1573        // now successfully ingested, drained, and dispatched through
1574        // the adapter. They appear in `events_dispatched` (the
1575        // delta since the deadline), so:
1576        //
1577        //   actual_drops = stranded
1578        //                  - (dispatched_after_drain - dispatched@deadline)
1579        //                  - (ingested_after_drain - ingested@deadline)
1580        //                       only counting events that landed but
1581        //                       weren't dispatched (dropped under
1582        //                       backpressure, etc.)
1583        //
1584        // The cleaner reconciliation: events that completed
1585        // `try_enter_ingest` AFTER the deadline either completed
1586        // ingest (bumping `events_ingested`) or were dropped on
1587        // backpressure (bumping `events_dropped` from the existing
1588        // backpressure paths). The `stranded - delta_ingested`
1589        // remainder is producers whose `try_enter_ingest` succeeded
1590        // but never reached `shard_manager.ingest()` — those are
1591        // the genuinely-lost events we should account for.
1592        if let Some((stranded, ingested_at_deadline, _dispatched_at_deadline)) = deadline_snapshot {
1593            let ingested_after = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1594            let post_deadline_ingests = ingested_after.saturating_sub(ingested_at_deadline);
1595            // Known under-count window: a producer that completed
1596            // `shard_manager.ingest()` and bumped `events_ingested`
1597            // just BEFORE its `IngestGuard` decremented
1598            // `in_flight_ingests` will be counted in
1599            // `ingested_at_deadline` (already past the bump) AND in
1600            // `stranded` (still pending the decrement on that same
1601            // spin). That single event contributes `+1` to both
1602            // sides of the subtraction, so the saturating_sub
1603            // under-counts the drop by 1 per such interleaving.
1604            // The opposite direction (producer in-flight at
1605            // deadline that never completed ingest) is correctly
1606            // counted as a drop, so the bias is one-sided and
1607            // small (bounded by the number of producers mid-push
1608            // at the exact deadline moment — typically 0–1 even
1609            // under heavy load). Operators reading
1610            // `shutdown_was_lossy` get the right boolean; the
1611            // numeric `events_dropped` may under-count by a few
1612            // events on a lossy shutdown. Acceptable; the cost of
1613            // closing the window is paired-SeqCst reloads under
1614            // the spin which would dominate the shutdown path.
1615            let actual_drops = stranded.saturating_sub(post_deadline_ingests);
1616            if actual_drops > 0 {
1617                self.stats
1618                    .events_dropped
1619                    .fetch_add(actual_drops, AtomicOrdering::Relaxed);
1620            } else {
1621                // The deadline tripped the eager
1622                // `shutdown_was_lossy = true` set above, but the
1623                // final drain ingested every stranded event so
1624                // nothing was actually lost. Clear the flag so
1625                // operator dashboards alerting on
1626                // `was_lossy && events_dropped == 0` don't see a
1627                // false positive. Pre-fix the boolean stayed
1628                // `true` against `events_dropped == 0` for any
1629                // deadline-triggered shutdown whose drain
1630                // happened to catch up.
1631                self.stats
1632                    .shutdown_was_lossy
1633                    .store(false, AtomicOrdering::Release);
1634            }
1635            tracing::warn!(
1636                stranded_at_deadline = stranded,
1637                post_deadline_ingests,
1638                actual_drops,
1639                "lossy shutdown reconciled: post-drain `events_dropped` bumped \
1640                 by stranded - post-deadline-ingests (pre-fix this bumped by \
1641                 the full `stranded` count, double-counting events the drain \
1642                 still successfully delivered)",
1643            );
1644        }
1645
1646        // Mark shutdown as completed so Drop knows not to warn.
1647        self.shutdown_completed.store(true, AtomicOrdering::Release);
1648        result
1649    }
1650
1651    /// True once `shutdown` / `shutdown_via_ref` has signaled — does
1652    /// not imply the shutdown work has finished. Use
1653    /// [`is_shutdown_completed`](Self::is_shutdown_completed) for
1654    /// completion.
1655    pub fn is_shutdown(&self) -> bool {
1656        self.shutdown.load(AtomicOrdering::Acquire)
1657    }
1658
1659    /// True once `shutdown` / `shutdown_via_ref` has fully drained
1660    /// workers and the adapter shutdown returned (success path only).
1661    pub fn is_shutdown_completed(&self) -> bool {
1662        self.shutdown_completed.load(AtomicOrdering::Acquire)
1663    }
1664
1665    /// Get shard metrics (if dynamic scaling is enabled).
1666    pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>> {
1667        self.shard_manager.collect_metrics()
1668    }
1669
1670    /// Check if dynamic scaling is enabled.
1671    pub fn is_dynamic_scaling_enabled(&self) -> bool {
1672        self.config.scaling.is_some()
1673    }
1674
1675    /// Manually trigger a scale-up (for testing or manual intervention).
1676    ///
1677    /// Bypasses the auto-scaling cooldown so a deliberate operator
1678    /// request isn't rate-limited by the auto-scaling cadence.
1679    /// Pre-fix this looped `add_shard_internal()` N times, each
1680    /// of which bumped `last_scaling`, so iteration 1+ failed
1681    /// with `InCooldown` against any non-zero cooldown — the
1682    /// first shard was left half-added (workers spawned, routing
1683    /// entry installed) while the error propagated to the
1684    /// caller. The `max_shards` budget check still applies.
1685    pub async fn manual_scale_up(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1686        let mut new_ids = Vec::with_capacity(count as usize);
1687        for _ in 0..count {
1688            let id = self.add_shard_internal_force().await?;
1689            new_ids.push(id);
1690        }
1691        Ok(new_ids)
1692    }
1693
1694    /// Manually trigger a scale-down (for testing or manual intervention).
1695    ///
1696    /// Marks `count` shards as `Draining`, waits for them to empty,
1697    /// finalizes them to `Stopped`, and removes them from the
1698    /// routing table — mirroring the scaling monitor's per-tick
1699    /// finalize loop. Returns the IDs of shards that were
1700    /// successfully drained AND removed (subset of those marked
1701    /// Draining if any failed to empty within the deadline).
1702    ///
1703    /// Drives the full scale-down lifecycle synchronously: a
1704    /// plain `mapper.scale_down` call marks shards `Draining` but
1705    /// does NOT finalize them — finalization is the scaling
1706    /// monitor's responsibility. Bus configs without an active
1707    /// monitor (or callers that shut down before the monitor's
1708    /// next tick) would otherwise lose any events queued in those
1709    /// shards' ring buffers.
1710    pub async fn manual_scale_down(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1711        let mapper = self
1712            .shard_manager
1713            .mapper()
1714            .ok_or_else(|| AdapterError::Fatal("Dynamic scaling not enabled".into()))?;
1715
1716        let drained_ids = mapper
1717            .scale_down(count)
1718            .map_err(|e| AdapterError::Fatal(e.to_string()))?;
1719
1720        // `finalize_draining` requires the shard to have been
1721        // Draining for >100ms with an empty ring buffer and no
1722        // pushes since drain start. Poll until every requested
1723        // shard finalizes, capped by an outer deadline so a wedged
1724        // producer can't pin this method forever. Use
1725        // `tokio::time::Instant` so tests under `tokio::time::pause()`
1726        // advance the virtual clock via `sleep` rather than spinning
1727        // until wall-clock catches up.
1728        let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1729        let mut finalized: std::collections::HashSet<u16> = std::collections::HashSet::new();
1730        let target: std::collections::HashSet<u16> = drained_ids.iter().copied().collect();
1731
1732        while finalized.len() < target.len() && tokio::time::Instant::now() < deadline {
1733            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1734            let stopped = mapper.finalize_draining();
1735            // `finalize_draining` is destructive — every qualifying
1736            // Draining shard transitions to Stopped in one shot,
1737            // regardless of who initiated the drain. Pre-fix the
1738            // `if target.contains(&shard_id)` filter dropped non-
1739            // target ids on the floor; if the scaling monitor (or
1740            // a parallel `manual_scale_down` on a different target
1741            // set) finalized one of THEIR shards in the same tick,
1742            // that shard ended up Stopped with workers + routing
1743            // entry intact — leaked. Always tear down via
1744            // `remove_shard_internal` so the bus-side state
1745            // (workers, sender, routing) is freed; only count the
1746            // target subset toward the returned Vec.
1747            for shard_id in stopped {
1748                let _ = self.remove_shard_internal(shard_id).await;
1749                if target.contains(&shard_id) {
1750                    finalized.insert(shard_id);
1751                }
1752            }
1753        }
1754
1755        // Surface partial success at WARN level. The return shape
1756        // is preserved for compat (changing to `(Vec, Vec)` would
1757        // break the existing callers + test) so a smaller-than-
1758        // target list could otherwise be silently mistaken for full
1759        // success; the WARN log gives operations tooling something
1760        // to alert on.
1761        if finalized.len() < target.len() {
1762            let still_draining: Vec<u16> = target.difference(&finalized).copied().collect();
1763            tracing::warn!(
1764                requested = target.len(),
1765                finalized = finalized.len(),
1766                still_draining = ?still_draining,
1767                "manual_scale_down deadline elapsed before all targeted \
1768                 shards finalized — events still in-flight on the listed \
1769                 shards. They will finalize on the next scaling-monitor \
1770                 tick or on shutdown."
1771            );
1772        }
1773
1774        Ok(finalized.into_iter().collect())
1775    }
1776}
1777
1778impl Drop for EventBus {
1779    fn drop(&mut self) {
1780        // Signal shutdown so background tasks (drain workers, batch
1781        // workers, scaling monitor) observe the flag and exit. We
1782        // cannot await futures in Drop, but setting the atomic flag
1783        // triggers eventual termination.
1784        //
1785        // Previously used `Release` here while `try_enter_ingest`
1786        // and `shutdown()` use `SeqCst` on the same flag. `&mut self`
1787        // exclusion makes that sound today (no concurrent ingest can
1788        // observe a half-published shutdown). The mismatch is purely
1789        // defensive — switching to `SeqCst` matches the rest of the
1790        // lifecycle and removes a footgun if a future change ever
1791        // opened a path where `Drop` overlaps an in-flight
1792        // `try_enter_ingest`.
1793        self.shutdown.store(true, AtomicOrdering::SeqCst);
1794        // Also release the drain-finalize gate so any drain worker
1795        // already parked waiting for it can proceed and exit. Without
1796        // this, drop-without-shutdown leaves drain workers blocked on
1797        // `drain_finalize_ready` until their internal deadline fires
1798        // (which delays task cleanup by `DRAIN_FINALIZE_TIMEOUT`).
1799        // Best-effort durability: drop never gets the in-flight wait,
1800        // so any push that lands after this point is still lost.
1801        self.drain_finalize_ready
1802            .store(true, AtomicOrdering::SeqCst);
1803
1804        // Workers do NOT hold `Arc<EventBus>` — they hold
1805        // independent `Arc<ShardManager>` / `Arc<dyn Adapter>`
1806        // clones plus the channel halves. When `Drop` returns,
1807        // those Arcs survive in the still-running tasks and they
1808        // continue draining / dispatching until they observe the
1809        // shutdown flag we just set. There's no partial-Drop UB
1810        // risk: nothing on the worker side dereferences a
1811        // dropped EventBus field. The flags promote the
1812        // worker tasks from "blocked on recv / parked on
1813        // drain_finalize_ready" to "observe shutdown=true and
1814        // exit" so they don't linger indefinitely.
1815        //
1816        // If `shutdown()` was never awaited, any events still in the
1817        // per-shard ring buffers or mpsc channels are lost — the
1818        // adapter's `flush()` and `shutdown()` won't run, so durable
1819        // backends never see them. We can't fix that from `Drop` (no
1820        // async), but we *can* surface the data-loss risk loudly so
1821        // it doesn't hide. The check is bounded to "shutdown was
1822        // never started"; an in-progress shutdown is fine because the
1823        // call site is awaiting it.
1824        if !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1825            // Count events still sitting in shard ring buffers. They
1826            // are stranded — the drain workers will see `shutdown =
1827            // true` and exit without flushing, the adapter's
1828            // `flush()`/`shutdown()` never run, so anything in the
1829            // rings at this point is permanently lost. Surface that
1830            // loss via `events_dropped` so post-mortem stats reflect
1831            // reality (operators alerting on `events_dropped > 0`
1832            // would otherwise miss the entire incident), and set
1833            // `shutdown_was_lossy` so the boolean view is consistent
1834            // with the counter view.
1835            //
1836            // Events in the BatchWorker mpsc channels or pending
1837            // batches are not counted here — those workers may still
1838            // observe the shutdown flag and exit, but we have no
1839            // synchronous way from Drop to enumerate them. The ring-
1840            // buffer count is a lower bound on the stranded total.
1841            //
1842            // Use the non-blocking accessor: if `Drop` runs on a
1843            // thread that already holds a shard mutex (single-thread
1844            // runtime + panic during shutdown is the canonical
1845            // hazard), the blocking `total_pending_in_rings` would
1846            // self-deadlock. Best-effort accounting is the right
1847            // trade-off here — we'd rather under-report stranded
1848            // events than wedge the drop forever.
1849            let (stranded_in_rings, uncounted_shards) =
1850                self.shard_manager.try_total_pending_in_rings();
1851            if uncounted_shards > 0 {
1852                tracing::warn!(
1853                    uncounted_shards,
1854                    "EventBus::drop: {uncounted_shards} shard(s) were locked at \
1855                     drop time and could not be accounted for in stranded_in_rings"
1856                );
1857            }
1858            if stranded_in_rings > 0 {
1859                self.stats
1860                    .events_dropped
1861                    .fetch_add(stranded_in_rings, AtomicOrdering::Relaxed);
1862                self.stats
1863                    .shutdown_was_lossy
1864                    .store(true, AtomicOrdering::Release);
1865            }
1866
1867            let stats = self.shard_manager.stats();
1868            tracing::warn!(
1869                events_ingested = stats.events_ingested,
1870                events_dropped = stats.events_dropped,
1871                stranded_in_rings,
1872                "EventBus dropped without an awaited shutdown(). Any in-flight \
1873                 events still in the ring buffers or batch channels will be lost \
1874                 — the adapter's flush()/shutdown() never ran. Call \
1875                 `bus.shutdown().await` before dropping for durable shutdown."
1876            );
1877        }
1878    }
1879}
1880
1881/// Spin deadline for the second-caller path in
1882/// `shutdown_via_ref`. 10s in production.
1883#[cfg(not(test))]
1884fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1885    std::time::Duration::from_secs(10)
1886}
1887
1888/// Test-only override. Stored as milliseconds; `0` (the default)
1889/// means "use the production 10s". Set via
1890/// [`set_shutdown_via_ref_spin_deadline_for_test`] from inside a
1891/// test that needs to exercise the deadline-elapsed path without
1892/// wall-clock-waiting the full 10s.
1893///
1894/// This is a global atomic shared across all tests in the
1895/// `cargo test --lib` binary. If two tests touched it concurrently,
1896/// one's override would leak into the other's expectations. Tests
1897/// that use the override MUST take the
1898/// [`SHUTDOWN_DEADLINE_OVERRIDE_GUARD`] mutex for the duration of
1899/// their override-setter / read window — see
1900/// [`set_shutdown_via_ref_spin_deadline_for_test`].
1901#[cfg(test)]
1902static SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS: std::sync::atomic::AtomicU64 =
1903    std::sync::atomic::AtomicU64::new(0);
1904
1905/// Serializes access to the deadline override so concurrent tests
1906/// can't observe each other's transient values. Tests that override
1907/// the deadline take this mutex via
1908/// [`shutdown_deadline_override_lock`] and hold the guard until
1909/// they reset the override to 0.
1910///
1911/// Uses `tokio::sync::Mutex` rather than `std::sync::Mutex` so
1912/// the guard can legitimately be held across `.await` points
1913/// while the guarded test runs (clippy::await_holding_lock would
1914/// otherwise fire on the std variant — and rightly so for
1915/// production code).
1916#[cfg(test)]
1917static SHUTDOWN_DEADLINE_OVERRIDE_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
1918
1919/// Acquire the guard mutex protecting the deadline-override
1920/// static. Tests touching the override hold the returned guard
1921/// across both the set and the reset, so concurrent tests
1922/// observe a consistent default.
1923#[cfg(test)]
1924pub(crate) async fn shutdown_deadline_override_lock() -> tokio::sync::MutexGuard<'static, ()> {
1925    SHUTDOWN_DEADLINE_OVERRIDE_GUARD.lock().await
1926}
1927
1928#[cfg(test)]
1929fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1930    let ms = SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.load(std::sync::atomic::Ordering::Relaxed);
1931    if ms == 0 {
1932        std::time::Duration::from_secs(10)
1933    } else {
1934        std::time::Duration::from_millis(ms)
1935    }
1936}
1937
1938#[cfg(test)]
1939pub(crate) fn set_shutdown_via_ref_spin_deadline_for_test(ms: u64) {
1940    SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
1941}
1942
1943async fn run_scaling_monitor_via_weak(weak: std::sync::Weak<EventBus>) {
1944    // Refresh `interval` from the policy on every tick. The previous
1945    // version cached it once at task start, so any future runtime
1946    // policy update would not be adopted by the monitor without a
1947    // process restart. Today `EventBusConfig` is immutable
1948    // post-construction so this is a no-op — but reading it each tick
1949    // is cheap (one atomic / RwLock read) and removes the latent
1950    // footgun.
1951    loop {
1952        let interval = match weak.upgrade() {
1953            Some(bus) => match &bus.config.scaling {
1954                Some(p) => p.metrics_window,
1955                None => return,
1956            },
1957            None => return,
1958        };
1959        tokio::time::sleep(interval).await;
1960
1961        let bus = match weak.upgrade() {
1962            Some(b) => b,
1963            // Last strong ref dropped — caller is shutting down (or
1964            // already gone). Exit cleanly.
1965            None => break,
1966        };
1967
1968        // SeqCst to match the writer side (`EventBus::shutdown` /
1969        // `Drop`). The Acquire/Release handshake on
1970        // `drain_finalize_ready` already provides the load-bearing
1971        // happens-before today — but a future code change that
1972        // piggybacks on `shutdown`'s ordering (e.g. a producer that
1973        // observes shutdown without going through
1974        // `try_enter_ingest`) would silently break under Relaxed.
1975        // Aligning the read-side ordering with the writer-side
1976        // SeqCst is a one-instruction tax for the safety.
1977        if bus.shutdown.load(AtomicOrdering::SeqCst) {
1978            break;
1979        }
1980
1981        // Collect metrics for observability.
1982        if let Some(metrics) = bus.shard_manager.collect_metrics() {
1983            for m in &metrics {
1984                if m.fill_ratio > 0.5 {
1985                    tracing::debug!(
1986                        shard_id = m.shard_id,
1987                        fill_ratio = m.fill_ratio,
1988                        event_rate = m.event_rate,
1989                        "Shard metrics"
1990                    );
1991                }
1992            }
1993        }
1994
1995        // Evaluate scaling.
1996        match bus.shard_manager.evaluate_scaling() {
1997            ScalingDecision::ScaleUp(count) => {
1998                tracing::info!(count = count, "Scaling up shards");
1999                for _ in 0..count {
2000                    if let Err(e) = bus.add_shard_internal().await {
2001                        tracing::error!(error = %e, "Failed to add shard");
2002                        break;
2003                    }
2004                }
2005            }
2006            ScalingDecision::ScaleDown(count) => {
2007                tracing::info!(count = count, "Scaling down shards");
2008                if let Some(mapper) = bus.shard_manager.mapper() {
2009                    if let Ok(drained) = mapper.scale_down(count) {
2010                        for shard_id in drained {
2011                            let _ = bus.shard_manager.drain_shard(shard_id);
2012                        }
2013                    }
2014                }
2015            }
2016            ScalingDecision::None => {}
2017        }
2018
2019        if let Some(mapper) = bus.shard_manager.mapper() {
2020            let stopped = mapper.finalize_draining();
2021            for shard_id in stopped {
2022                let _ = bus.remove_shard_internal(shard_id).await;
2023            }
2024        }
2025
2026        // CRITICAL: drop the strong ref BEFORE the next sleep so a
2027        // concurrent `shutdown(self)` caller can `Arc::try_unwrap`
2028        // the last strong ref while we're sleeping.
2029        drop(bus);
2030    }
2031}
2032
2033/// Spawn a batch worker for a shard.
2034/// Dispatch a batch to the adapter with timeout and optional retries.
2035/// Returns true if the batch was accepted, false if all attempts failed.
2036///
2037/// Non-retryable errors (e.g. `AdapterError::Connection`,
2038/// `AdapterError::Fatal`, `AdapterError::Serialization`) skip the
2039/// retry loop and drop the batch immediately. Retrying a fatal error
2040/// just delays the inevitable while burning CPU and amplifying log
2041/// noise. Use `AdapterError::is_retryable` as the single source of
2042/// truth for this decision.
2043/// Compute the per-attempt backoff for `dispatch_batch` retries.
2044///
2045/// Pre-fix the retry loop slept a flat `Duration::from_millis(100)`
2046/// after every failure. Under a partial backend outage (Redis /
2047/// JetStream slow but not dead), every shard's BatchWorker retried
2048/// on the exact same 100 ms cadence, producing a synchronized
2049/// retry storm that amplified load while the backend was
2050/// recovering.
2051///
2052/// Post-fix: exponential backoff (100, 200, 400, 800, 1600, 3200 ms)
2053/// with per-(shard, attempt) jitter to decorrelate retries across
2054/// shards. Capped at attempt=5 (3.2 s base) so retries don't grow
2055/// unboundedly. Jitter is `[-25%, +25%]` of the base, derived from
2056/// hashing `(shard_id, attempt)` — deterministic per shard but
2057/// uncorrelated across shards, which is exactly what the storm
2058/// mitigation needs.
2059fn retry_backoff(shard_id: u16, attempt: u32) -> Duration {
2060    use std::collections::hash_map::DefaultHasher;
2061    use std::hash::{Hash, Hasher};
2062
2063    // 100 ms × 2^attempt, capped at attempt=5 → 100/200/400/800/1600/3200.
2064    let base_ms: u64 = 100u64.saturating_mul(1u64 << attempt.min(5));
2065
2066    let mut hasher = DefaultHasher::new();
2067    shard_id.hash(&mut hasher);
2068    attempt.hash(&mut hasher);
2069    let h = hasher.finish();
2070    // Jitter in [0, base_ms/2), centered to give [-25%, +25%].
2071    let jitter_range = (base_ms / 2).max(1);
2072    let jitter = (h % jitter_range) as i64 - (jitter_range as i64 / 2);
2073    let final_ms = (base_ms as i64 + jitter).max(1) as u64;
2074
2075    Duration::from_millis(final_ms)
2076}
2077
2078async fn dispatch_batch(
2079    adapter: &dyn Adapter,
2080    batch: Batch,
2081    shard_id: u16,
2082    timeout: Duration,
2083    retries: u32,
2084) -> bool {
2085    // Retry attempts clone the batch; the final attempt moves it, saving
2086    // one clone per dispatch (the common path is retries == 0).
2087    for attempt in 0..retries {
2088        match tokio::time::timeout(timeout, adapter.on_batch(batch.clone())).await {
2089            Ok(Ok(())) => return true,
2090            Ok(Err(e)) => {
2091                if !e.is_retryable() {
2092                    // Tag with a `reason` field so this
2093                    // distinct drop cause is separately filterable
2094                    // from retry-exhausted and timeout in
2095                    // observability tools. Shutdown is distinguished
2096                    // from generic non-retryable so an operator
2097                    // chasing "why are batches being dropped" can
2098                    // immediately tell a sequencing bug (sending to
2099                    // a stopped adapter) from a transport / config
2100                    // failure.
2101                    let reason = if e.is_shutdown() {
2102                        "adapter_shutdown"
2103                    } else {
2104                        "non_retryable"
2105                    };
2106                    tracing::error!(
2107                        shard_id,
2108                        error = %e,
2109                        attempt,
2110                        reason,
2111                        "Non-retryable error from adapter, dropping batch"
2112                    );
2113                    return false;
2114                }
2115                tracing::warn!(shard_id, error = %e, attempt, "Batch dispatch failed, retrying");
2116            }
2117            Err(_) => {
2118                tracing::warn!(shard_id, attempt, "Adapter on_batch timed out, retrying");
2119            }
2120        }
2121        tokio::time::sleep(retry_backoff(shard_id, attempt)).await;
2122    }
2123
2124    // Pre-fix the final attempt collapsed every drop into
2125    // one log line ("Failed to dispatch batch, dropping"), making
2126    // it impossible to tell retry-exhausted from fatal-non-
2127    // retryable from timeout in metrics. The non-retryable case
2128    // already has its own log inside the retry loop above (early
2129    // return); here we tag retry-exhausted vs timeout-after-
2130    // retries with distinct `reason` fields so log-based
2131    // observability tools can break the drops out by cause.
2132    match tokio::time::timeout(timeout, adapter.on_batch(batch)).await {
2133        Ok(Ok(())) => true,
2134        Ok(Err(e)) => {
2135            tracing::error!(
2136                shard_id,
2137                error = %e,
2138                reason = "retry_exhausted",
2139                attempts = retries + 1,
2140                "Failed to dispatch batch after exhausting retries, dropping"
2141            );
2142            false
2143        }
2144        Err(_) => {
2145            tracing::error!(
2146                shard_id,
2147                reason = "timeout",
2148                attempts = retries + 1,
2149                "Adapter on_batch timed out on final attempt, dropping batch"
2150            );
2151            false
2152        }
2153    }
2154}
2155
2156struct BatchWorkerParams {
2157    shard_id: u16,
2158    rx: mpsc::Receiver<Vec<crate::event::InternalEvent>>,
2159    adapter: Arc<dyn Adapter>,
2160    shard_manager: Arc<ShardManager>,
2161    config: BatchConfig,
2162    adapter_timeout: Duration,
2163    batch_retries: u32,
2164    /// Bus-owned mirror of `BatchWorker::next_sequence`. The worker
2165    /// stores its post-flush sequence here on every dispatch so the
2166    /// bus can read it after the worker exits — see
2167    /// `ShardWorkers::next_sequence` for the consumer side.
2168    next_sequence: Arc<AtomicU64>,
2169    /// Bus-level stats. The worker increments
2170    /// `batches_dispatched` and `events_dispatched` after every
2171    /// successful `dispatch_batch`. Both must actually be
2172    /// incremented here, otherwise `flush()`'s Phase 2 progress
2173    /// probe would always observe zero progress and early-break
2174    /// after a single `max_delay` window — racing the
2175    /// BatchWorker's first `recv_timeout` and flaking on
2176    /// Windows-class timer resolution.
2177    stats: Arc<EventBusStats>,
2178    /// Producer nonce stamped on every batch the worker emits.
2179    /// Bus-loaded from the persistent path when
2180    /// `producer_nonce_path` is configured, otherwise from the
2181    /// per-process default.
2182    producer_nonce: u64,
2183}
2184
2185fn spawn_batch_worker(params: BatchWorkerParams) -> JoinHandle<()> {
2186    let BatchWorkerParams {
2187        shard_id,
2188        mut rx,
2189        adapter,
2190        shard_manager,
2191        config,
2192        adapter_timeout,
2193        batch_retries,
2194        next_sequence,
2195        stats,
2196        producer_nonce,
2197    } = params;
2198    tokio::spawn(async move {
2199        let mut worker = BatchWorker::new(shard_id, config.clone(), next_sequence, producer_nonce);
2200
2201        loop {
2202            // Wait for events with timeout. The batch worker exits
2203            // only when its channel is closed — i.e. after every
2204            // upstream sender (the drain worker for this shard +
2205            // `EventBus::batch_senders`) has been dropped.
2206            // `EventBus::shutdown` enforces that ordering so no
2207            // event is left stranded in the channel.
2208            let recv_timeout = worker.time_until_timeout().unwrap_or(config.max_delay);
2209
2210            match tokio::time::timeout(recv_timeout, rx.recv()).await {
2211                Ok(Some(events)) => {
2212                    if let Some(batch) = worker.add_events(events) {
2213                        let batch_len = batch.len() as u64;
2214                        if dispatch_batch(
2215                            &*adapter,
2216                            batch,
2217                            shard_id,
2218                            adapter_timeout,
2219                            batch_retries,
2220                        )
2221                        .await
2222                        {
2223                            stats
2224                                .batches_dispatched
2225                                .fetch_add(1, AtomicOrdering::Relaxed);
2226                            stats
2227                                .events_dispatched
2228                                .fetch_add(batch_len, AtomicOrdering::Relaxed);
2229                            if let Some(shard_ref) = shard_manager.shard(shard_id) {
2230                                shard_ref.lock().record_batch_dispatch();
2231                            }
2232                        }
2233                    }
2234                }
2235                Ok(None) => {
2236                    // Channel closed — drain any pending and exit.
2237                    if worker.has_pending() {
2238                        let batch = worker.flush();
2239                        if !batch.is_empty() {
2240                            let batch_len = batch.len() as u64;
2241                            if dispatch_batch(
2242                                &*adapter,
2243                                batch,
2244                                shard_id,
2245                                adapter_timeout,
2246                                batch_retries,
2247                            )
2248                            .await
2249                            {
2250                                stats
2251                                    .batches_dispatched
2252                                    .fetch_add(1, AtomicOrdering::Relaxed);
2253                                stats
2254                                    .events_dispatched
2255                                    .fetch_add(batch_len, AtomicOrdering::Relaxed);
2256                            }
2257                        }
2258                    }
2259                    break;
2260                }
2261                Err(_) => {
2262                    // Timeout - check if we need to flush
2263                    if let Some(batch) = worker.add_events(vec![]) {
2264                        let batch_len = batch.len() as u64;
2265                        if dispatch_batch(
2266                            &*adapter,
2267                            batch,
2268                            shard_id,
2269                            adapter_timeout,
2270                            batch_retries,
2271                        )
2272                        .await
2273                        {
2274                            stats
2275                                .batches_dispatched
2276                                .fetch_add(1, AtomicOrdering::Relaxed);
2277                            stats
2278                                .events_dispatched
2279                                .fetch_add(batch_len, AtomicOrdering::Relaxed);
2280                            if let Some(shard_ref) = shard_manager.shard(shard_id) {
2281                                shard_ref.lock().record_batch_dispatch();
2282                            }
2283                        }
2284                    }
2285                }
2286            }
2287        }
2288    })
2289}
2290
2291/// Maximum time a drain worker waits for `drain_finalize_ready`
2292/// after observing `shutdown=true`. Defense in depth: if a caller
2293/// drops the bus mid-shutdown without setting the gate, we don't
2294/// want the worker pinned forever. The shutdown path *always* sets
2295/// the gate (even on its own timeout), so this deadline is normally
2296/// unreached.
2297const DRAIN_FINALIZE_TIMEOUT: Duration = Duration::from_secs(10);
2298
2299/// Spawn a drain worker for a single shard.
2300///
2301/// Uses a scratch `Vec` + `pop_batch_into` so the per-cycle
2302/// allocation happens *outside* the shard mutex critical section.
2303/// Each cycle: lock → drain into scratch (no alloc, capacity already
2304/// reserved) → unlock → `mem::replace` swaps the filled scratch out
2305/// for a fresh empty `Vec` (alloc *outside* the lock) → send the
2306/// filled batch on the channel.
2307fn spawn_drain_worker_for_shard(
2308    shard_id: u16,
2309    shard_manager: Arc<ShardManager>,
2310    sender: mpsc::Sender<Vec<crate::event::InternalEvent>>,
2311    shutdown: Arc<AtomicBool>,
2312    drain_finalize_ready: Arc<AtomicBool>,
2313) -> JoinHandle<()> {
2314    const STEADY_BATCH: usize = 1_000;
2315    const FINAL_BATCH: usize = 10_000;
2316
2317    tokio::spawn(async move {
2318        let mut scratch: Vec<crate::event::InternalEvent> = Vec::with_capacity(STEADY_BATCH);
2319
2320        loop {
2321            // SeqCst to match the writer side (`EventBus::shutdown` /
2322            // `Drop`). `try_enter_ingest` itself uses SeqCst, and
2323            // the Acquire/Release handshake on
2324            // `drain_finalize_ready` (below) is what actually makes
2325            // the producer-push happen-before visible. Aligning to
2326            // SeqCst here makes the contract robust to future
2327            // producer-side changes that might piggyback on
2328            // `shutdown`'s ordering.
2329            if shutdown.load(AtomicOrdering::SeqCst) {
2330                // Before doing the final sweep, wait for `shutdown()`
2331                // to release the finalize gate. The gate is set only
2332                // after the in-flight ingest counter reaches zero,
2333                // which means every producer that read `shutdown=false`
2334                // has completed its push. Without this wait, the drain
2335                // worker can race ahead of a late push under
2336                // shard-mutex serialization (drain takes the lock
2337                // first, sees nothing, exits; producer then takes the
2338                // lock and pushes — event stranded).
2339                //
2340                // Acquire pairs with the Release in `EventBus::shutdown`
2341                // and `EventBus::drop`, transitively making every
2342                // producer push that happened-before its `in_flight`
2343                // decrement visible to the subsequent `pop_batch_into`.
2344                // `tokio::time::Instant` so virtualized-clock tests
2345                // (`tokio::time::pause`) advance the deadline via
2346                // `sleep` rather than spinning until wall-clock catches
2347                // up.
2348                let finalize_deadline = tokio::time::Instant::now() + DRAIN_FINALIZE_TIMEOUT;
2349                while !drain_finalize_ready.load(AtomicOrdering::Acquire) {
2350                    if tokio::time::Instant::now() >= finalize_deadline {
2351                        tracing::warn!(
2352                            shard_id,
2353                            "drain worker timed out waiting for finalize gate; \
2354                             proceeding with potential event loss"
2355                        );
2356                        break;
2357                    }
2358                    // Park instead of `yield_now` so we don't
2359                    // starve the workers / producers we're waiting
2360                    // on under contention.
2361                    tokio::time::sleep(std::time::Duration::from_millis(1)).await;
2362                }
2363
2364                // Final drain: loop until the ring buffer is empty.
2365                // A single 10k batch is not enough — the ring
2366                // buffer can hold up to `ring_buffer_capacity`
2367                // events (default 1M) and any leftover would be
2368                // silently lost on shutdown.
2369                //
2370                // Pre-fix this broke at the first
2371                // `popped == 0`. The audit posited a narrow race
2372                // where a producer that fetch_add'd
2373                // in_flight_ingests but stalled before the
2374                // shard-lock body could push AFTER shutdown
2375                // observed in_flight=0 yet BEFORE this final
2376                // sweep saw the event. The SeqCst guard pattern
2377                // makes this unlikely (the push happens-before
2378                // the guard drop), but the defense is cheap:
2379                // require TWO consecutive zero-event passes
2380                // before declaring drain. The yield_now between
2381                // them gives a stalled producer a chance to land
2382                // the push.
2383                let mut final_scratch: Vec<crate::event::InternalEvent> =
2384                    Vec::with_capacity(FINAL_BATCH);
2385                let mut consecutive_zeros = 0u32;
2386                loop {
2387                    let popped = shard_manager
2388                        .with_shard(shard_id, |shard| {
2389                            shard.pop_batch_into(&mut final_scratch, FINAL_BATCH)
2390                        })
2391                        .unwrap_or(0);
2392                    if popped == 0 {
2393                        consecutive_zeros += 1;
2394                        if consecutive_zeros >= 2 {
2395                            break;
2396                        }
2397                        // Yield to let any racing producer commit
2398                        // its push, then re-poll.
2399                        tokio::task::yield_now().await;
2400                        continue;
2401                    }
2402                    consecutive_zeros = 0;
2403                    let batch =
2404                        std::mem::replace(&mut final_scratch, Vec::with_capacity(FINAL_BATCH));
2405                    let batch_len = batch.len();
2406                    if let Err(_send_err) = sender.send(batch).await {
2407                        // Batch worker exited before drain. The
2408                        // `mem::replace` already pulled events out
2409                        // of the ring buffer, so the dropped batch
2410                        // is unrecoverable — the SendError carries
2411                        // it back but the consumer is gone. Surface
2412                        // the count loudly so the loss is
2413                        // observable in operator dashboards rather
2414                        // than a silent miss in shutdown stats.
2415                        tracing::error!(
2416                            shard_id,
2417                            dropped = batch_len,
2418                            "drain worker (final): batch worker dropped \
2419                             channel before final drain completed; \
2420                             events removed from ring buffer cannot be redelivered",
2421                        );
2422                        break;
2423                    }
2424                }
2425                break;
2426            }
2427
2428            // Drain events from ring buffer.
2429            let popped = shard_manager.with_shard(shard_id, |shard| {
2430                shard.pop_batch_into(&mut scratch, STEADY_BATCH)
2431            });
2432
2433            match popped {
2434                Some(0) => {
2435                    // No events — yield briefly. The 100μs sleep is deliberate:
2436                    // this is a latency-first system where the drain loop is the
2437                    // hot path. Longer backoff would add milliseconds of latency
2438                    // to the first event after a quiet period, violating the
2439                    // sub-microsecond design target. The CPU cost of 100μs polling
2440                    // is acceptable for a system that processes 10M+ events/sec.
2441                    tokio::time::sleep(Duration::from_micros(100)).await;
2442                }
2443                Some(_) => {
2444                    let batch = std::mem::replace(&mut scratch, Vec::with_capacity(STEADY_BATCH));
2445                    let batch_len = batch.len();
2446                    if let Err(_send_err) = sender.send(batch).await {
2447                        // Steady-state: the only way the batch
2448                        // worker drops the channel is if it
2449                        // panicked or `remove_shard_internal`
2450                        // tore it down out of order with the
2451                        // drain worker (which the documented
2452                        // shutdown sequence forbids). Either way,
2453                        // the events are unrecoverable — the
2454                        // `mem::replace` above already pulled them
2455                        // out of the ring buffer. Pre-fix this
2456                        // simply `break`-d, leaving the loss
2457                        // invisible. Surface a loud error with
2458                        // the dropped count so an out-of-order
2459                        // shutdown or batch-worker panic shows up
2460                        // in dashboards rather than as a silent
2461                        // metric gap.
2462                        tracing::error!(
2463                            shard_id,
2464                            dropped = batch_len,
2465                            "drain worker: batch worker dropped channel \
2466                             during steady-state drain; events removed from \
2467                             ring buffer cannot be redelivered",
2468                        );
2469                        break;
2470                    }
2471                }
2472                None => {
2473                    // Shard no longer exists (was removed)
2474                    break;
2475                }
2476            }
2477        }
2478    })
2479}
2480
2481#[cfg(test)]
2482mod tests {
2483    use super::*;
2484    use crate::shard::ScalingPolicy;
2485    use serde_json::json;
2486
2487    #[tokio::test]
2488    async fn test_event_bus_basic() {
2489        let config = EventBusConfig::builder()
2490            .num_shards(2)
2491            .ring_buffer_capacity(1024)
2492            .build()
2493            .unwrap();
2494
2495        let bus = EventBus::new(config).await.unwrap();
2496
2497        // Ingest some events
2498        for i in 0..10 {
2499            let event = Event::new(json!({"index": i}));
2500            bus.ingest(event).unwrap();
2501        }
2502
2503        // Give workers time to process
2504        tokio::time::sleep(Duration::from_millis(100)).await;
2505
2506        // Check stats
2507        assert_eq!(
2508            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2509            10
2510        );
2511
2512        bus.shutdown().await.unwrap();
2513    }
2514
2515    #[tokio::test]
2516    async fn test_event_bus_batch_ingest() {
2517        let config = EventBusConfig::default();
2518        let bus = EventBus::new(config).await.unwrap();
2519
2520        let events: Vec<Event> = (0..100).map(|i| Event::new(json!({"i": i}))).collect();
2521
2522        let ingested = bus.ingest_batch(events);
2523        assert_eq!(ingested, 100);
2524
2525        bus.shutdown().await.unwrap();
2526    }
2527
2528    #[tokio::test]
2529    async fn test_event_bus_with_dynamic_scaling() {
2530        let policy = ScalingPolicy {
2531            min_shards: 2,
2532            max_shards: 8,
2533            ..Default::default()
2534        };
2535
2536        let config = EventBusConfig::builder()
2537            .num_shards(2)
2538            .ring_buffer_capacity(1024)
2539            .scaling(policy)
2540            .build()
2541            .unwrap();
2542
2543        let bus = EventBus::new(config).await.unwrap();
2544
2545        // Verify dynamic scaling is enabled
2546        assert!(bus.is_dynamic_scaling_enabled());
2547        assert_eq!(bus.num_shards(), 2);
2548
2549        // Ingest some events
2550        for i in 0..100 {
2551            let event = Event::new(json!({"index": i}));
2552            bus.ingest(event).unwrap();
2553        }
2554
2555        // Give workers time to process
2556        tokio::time::sleep(Duration::from_millis(100)).await;
2557
2558        // Check stats
2559        assert_eq!(
2560            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2561            100
2562        );
2563
2564        bus.shutdown().await.unwrap();
2565    }
2566
2567    #[tokio::test]
2568    async fn test_manual_scale_up() {
2569        let policy = ScalingPolicy {
2570            min_shards: 2,
2571            max_shards: 8,
2572            cooldown: Duration::from_nanos(1), // Effectively disable cooldown for test
2573            ..Default::default()
2574        };
2575
2576        let config = EventBusConfig::builder()
2577            .num_shards(2)
2578            .ring_buffer_capacity(1024)
2579            .scaling(policy)
2580            .build()
2581            .unwrap();
2582
2583        let bus = EventBus::new(config).await.unwrap();
2584
2585        assert_eq!(bus.num_shards(), 2);
2586
2587        // Manually scale up
2588        let new_ids = bus.manual_scale_up(2).await.unwrap();
2589        assert_eq!(new_ids.len(), 2);
2590        assert_eq!(bus.num_shards(), 4);
2591
2592        // Ingest events - they should be distributed across all shards
2593        for i in 0..100 {
2594            let event = Event::new(json!({"index": i}));
2595            bus.ingest(event).unwrap();
2596        }
2597
2598        tokio::time::sleep(Duration::from_millis(100)).await;
2599
2600        assert_eq!(
2601            bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2602            100
2603        );
2604
2605        bus.shutdown().await.unwrap();
2606    }
2607
2608    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #82: previously
2609    /// `manual_scale_down` only called `mapper.scale_down(count)`,
2610    /// which marks shards as `Draining` but does NOT finalize them.
2611    /// Bus configs without an active scaling monitor (or callers
2612    /// shutting down before the monitor's next tick) lost any
2613    /// events queued in the drained shards' ring buffers because
2614    /// `remove_shard_internal` was never invoked. The fix runs the
2615    /// full lifecycle synchronously: scale_down → poll for empty →
2616    /// finalize_draining → remove_shard_internal.
2617    ///
2618    /// We pin this by scaling up, manually scaling down, and
2619    /// asserting that `num_shards` actually decreases — pre-fix
2620    /// the count would still reflect the Draining shards.
2621    #[tokio::test]
2622    async fn manual_scale_down_finalizes_and_removes_drained_shards() {
2623        let policy = ScalingPolicy {
2624            min_shards: 2,
2625            max_shards: 8,
2626            cooldown: Duration::from_nanos(1),
2627            ..Default::default()
2628        };
2629        let config = EventBusConfig::builder()
2630            .num_shards(2)
2631            .ring_buffer_capacity(1024)
2632            .scaling(policy)
2633            .build()
2634            .unwrap();
2635        let bus = EventBus::new(config).await.unwrap();
2636
2637        // Scale up to 4, then back down to 2.
2638        let added = bus.manual_scale_up(2).await.unwrap();
2639        assert_eq!(added.len(), 2);
2640        assert_eq!(bus.num_shards(), 4);
2641
2642        let removed = bus.manual_scale_down(2).await.unwrap();
2643        assert_eq!(
2644            removed.len(),
2645            2,
2646            "manual_scale_down must complete the lifecycle for both \
2647             requested shards (mark Draining → wait → finalize → remove)"
2648        );
2649
2650        // Pre-fix: `num_shards` would still be 4 because shards
2651        // were only marked Draining (and the routing-table removal
2652        // path never ran). Post-fix: it's back to 2.
2653        assert_eq!(
2654            bus.num_shards(),
2655            2,
2656            "drained shards must be removed from the routing table"
2657        );
2658
2659        bus.shutdown().await.unwrap();
2660    }
2661
2662    #[tokio::test]
2663    async fn test_shard_metrics() {
2664        let policy = ScalingPolicy::default();
2665
2666        let config = EventBusConfig::builder()
2667            .num_shards(2)
2668            .ring_buffer_capacity(1024)
2669            .scaling(policy)
2670            .build()
2671            .unwrap();
2672
2673        let bus = EventBus::new(config).await.unwrap();
2674
2675        // Ingest some events
2676        for i in 0..50 {
2677            let event = Event::new(json!({"index": i}));
2678            bus.ingest(event).unwrap();
2679        }
2680
2681        // Get metrics
2682        let metrics = bus.shard_metrics();
2683        assert!(metrics.is_some());
2684        let metrics = metrics.unwrap();
2685        assert_eq!(metrics.len(), 2);
2686
2687        bus.shutdown().await.unwrap();
2688    }
2689
2690    #[tokio::test]
2691    async fn test_regression_eventbus_drop_signals_shutdown() {
2692        // Regression: dropping an EventBus without calling shutdown() used to
2693        // leave background tasks running indefinitely. The Drop impl now sets
2694        // the shutdown flag so workers eventually exit.
2695        let result = tokio::time::timeout(Duration::from_secs(5), async {
2696            let config = EventBusConfig::builder()
2697                .num_shards(2)
2698                .ring_buffer_capacity(1024)
2699                .build()
2700                .unwrap();
2701
2702            let bus = EventBus::new(config).await.unwrap();
2703
2704            // Ingest some events
2705            for i in 0..10 {
2706                let event = Event::new(json!({"index": i}));
2707                bus.ingest(event).unwrap();
2708            }
2709
2710            // Drop without calling shutdown()
2711            drop(bus);
2712
2713            // If we reach here, the drop didn't hang
2714        })
2715        .await;
2716
2717        assert!(
2718            result.is_ok(),
2719            "EventBus drop should not hang — Drop impl must signal shutdown"
2720        );
2721    }
2722
2723    #[tokio::test]
2724    async fn test_with_dynamic_scaling_builder() {
2725        let config = EventBusConfig::builder()
2726            .num_shards(4)
2727            .ring_buffer_capacity(2048)
2728            .with_dynamic_scaling()
2729            .build()
2730            .unwrap();
2731
2732        let bus = EventBus::new(config).await.unwrap();
2733
2734        assert!(bus.is_dynamic_scaling_enabled());
2735        assert_eq!(bus.num_shards(), 4);
2736
2737        bus.shutdown().await.unwrap();
2738    }
2739
2740    /// Mock adapter that counts `on_batch` invocations and returns a
2741    /// configurable error variant. Used to assert dispatch retry
2742    /// semantics without dragging in a real adapter.
2743    struct CountingErrAdapter {
2744        calls: Arc<std::sync::atomic::AtomicU32>,
2745        make_err: Box<dyn Fn() -> AdapterError + Send + Sync>,
2746    }
2747
2748    #[async_trait::async_trait]
2749    impl crate::adapter::Adapter for CountingErrAdapter {
2750        async fn init(&mut self) -> Result<(), AdapterError> {
2751            Ok(())
2752        }
2753        async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2754            self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2755            Err((self.make_err)())
2756        }
2757        async fn flush(&self) -> Result<(), AdapterError> {
2758            Ok(())
2759        }
2760        async fn shutdown(&self) -> Result<(), AdapterError> {
2761            Ok(())
2762        }
2763        async fn poll_shard(
2764            &self,
2765            _shard_id: u16,
2766            _from_id: Option<&str>,
2767            _limit: usize,
2768        ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2769            Ok(crate::adapter::ShardPollResult::empty())
2770        }
2771        fn name(&self) -> &'static str {
2772            "counting_err"
2773        }
2774        async fn is_healthy(&self) -> bool {
2775            true
2776        }
2777    }
2778
2779    /// Regression: BUG_REPORT.md #21 — `dispatch_batch` previously
2780    /// retried every error variant, ignoring `AdapterError::is_retryable`.
2781    /// A non-retryable error (Connection / Fatal / Serialization)
2782    /// should now drop the batch immediately rather than burn the
2783    /// retry budget on something that cannot succeed.
2784    #[tokio::test(start_paused = true)]
2785    async fn dispatch_batch_skips_retries_on_non_retryable_error() {
2786        let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2787        let adapter = CountingErrAdapter {
2788            calls: calls.clone(),
2789            make_err: Box::new(|| AdapterError::Connection("refused".into())),
2790        };
2791
2792        let batch = Batch::new(0, vec![], 0);
2793        let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 5).await;
2794
2795        assert!(!ok, "non-retryable error must drop batch");
2796        assert_eq!(
2797            calls.load(std::sync::atomic::Ordering::SeqCst),
2798            1,
2799            "Connection error must not be retried; expected exactly 1 on_batch call"
2800        );
2801    }
2802
2803    /// Sanity: a retryable error *does* go through the full retry
2804    /// budget. Without this companion check, the previous test could
2805    /// pass for the wrong reason (e.g. if dispatch always returned on
2806    /// the first error).
2807    #[tokio::test(start_paused = true)]
2808    async fn dispatch_batch_retries_transient_errors() {
2809        let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2810        let adapter = CountingErrAdapter {
2811            calls: calls.clone(),
2812            make_err: Box::new(|| AdapterError::Transient("temp".into())),
2813        };
2814
2815        let batch = Batch::new(0, vec![], 0);
2816        let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 3).await;
2817
2818        assert!(!ok);
2819        // 3 retries + 1 final attempt = 4 total calls.
2820        assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 4);
2821    }
2822
2823    /// Counting adapter that records the number of events delivered via
2824    /// `on_batch`. Used by shutdown-durability tests below.
2825    struct CountingAdapter {
2826        received: Arc<AtomicU64>,
2827    }
2828
2829    #[async_trait::async_trait]
2830    impl crate::adapter::Adapter for CountingAdapter {
2831        async fn init(&mut self) -> Result<(), AdapterError> {
2832            Ok(())
2833        }
2834        async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
2835            self.received
2836                .fetch_add(batch.events.len() as u64, AtomicOrdering::SeqCst);
2837            Ok(())
2838        }
2839        async fn flush(&self) -> Result<(), AdapterError> {
2840            Ok(())
2841        }
2842        async fn shutdown(&self) -> Result<(), AdapterError> {
2843            Ok(())
2844        }
2845        async fn poll_shard(
2846            &self,
2847            _shard_id: u16,
2848            _from_id: Option<&str>,
2849            _limit: usize,
2850        ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2851            Ok(crate::adapter::ShardPollResult::empty())
2852        }
2853        fn name(&self) -> &'static str {
2854            "counting"
2855        }
2856        async fn is_healthy(&self) -> bool {
2857            true
2858        }
2859    }
2860
2861    /// `retry_backoff` exponentially grows the base delay
2862    /// per attempt and adds per-(shard, attempt) jitter to
2863    /// decorrelate retries across shards. Pin both invariants:
2864    /// monotonic growth on the base, and jitter that produces
2865    /// different outputs for different shard ids.
2866    #[test]
2867    fn retry_backoff_grows_with_attempt_and_jitters_per_shard() {
2868        // Shard 0 attempt 0..6: base ms 100, 200, 400, 800, 1600,
2869        // 3200, 3200 (cap). Plus ±25% jitter.
2870        let s0_a0 = retry_backoff(0, 0).as_millis();
2871        let s0_a1 = retry_backoff(0, 1).as_millis();
2872        let s0_a4 = retry_backoff(0, 4).as_millis();
2873        let s0_a5 = retry_backoff(0, 5).as_millis();
2874        let s0_a6 = retry_backoff(0, 6).as_millis();
2875
2876        // Bounds: each attempt's base is in `[base*0.75, base*1.25)`.
2877        assert!((75..=125).contains(&s0_a0));
2878        assert!((150..=250).contains(&s0_a1));
2879        assert!((1200..=2000).contains(&s0_a4));
2880        assert!((2400..=4000).contains(&s0_a5));
2881        // Cap at attempt=5: attempt 6 must NOT exceed attempt 5's
2882        // upper bound.
2883        assert!(
2884            s0_a6 <= 4000,
2885            "attempt > 5 must cap at the attempt-5 base; got {}ms",
2886            s0_a6
2887        );
2888
2889        // Jitter property: different shards at the same
2890        // attempt land on different backoffs. Sample 16 distinct
2891        // shard ids and assert at least 4 unique backoff values.
2892        //
2893        // The bound is deliberately loose (4 / 16) because
2894        // `DefaultHasher`'s exact distribution is **not stable**
2895        // across Rust toolchain versions — a tighter check (e.g.
2896        // ≥ 8) would empirically pass on every toolchain we test
2897        // against today, but a future stdlib change to the hasher
2898        // could shift the distribution and flake CI for a property
2899        // (decorrelation across shards) that doesn't actually
2900        // depend on a high collision-resistance bar. Asserting
2901        // ≥ 4 unique values out of 16 is enough to catch a real
2902        // regression (e.g. accidentally hashing only `attempt`
2903        // and not `shard_id` would collapse all 16 to a single
2904        // value) while staying robust to hasher-distribution
2905        // drift.
2906        use std::collections::HashSet;
2907        let s_attempt2: HashSet<u128> = (0u16..16)
2908            .map(|s| retry_backoff(s, 2).as_millis())
2909            .collect();
2910        assert!(
2911            s_attempt2.len() >= 4,
2912            "jitter must decorrelate retries across shards; \
2913             only {} unique backoffs across 16 shards",
2914            s_attempt2.len()
2915        );
2916    }
2917
2918    /// CR-23: pin that `EventBus::shutdown` actually invokes the
2919    /// adapter's `flush()` and `shutdown()` methods. The existing
2920    /// `sdk/tests/shutdown_regression.rs` covers the
2921    /// "shutdown runs even with outstanding Arc clones" property
2922    /// using a memory adapter whose `flush`/`shutdown` are no-ops
2923    /// — so a regression that elided the adapter calls would still
2924    /// pass. This test uses a recording adapter that increments
2925    /// per-method counters; we assert flush AND shutdown both fired
2926    /// exactly once after a clean `bus.shutdown().await`.
2927    ///
2928    /// The fix routes `Net::shutdown` through
2929    /// `shutdown_via_ref`, which in turn calls
2930    /// `self.adapter.flush()` and `self.adapter.shutdown()` once
2931    /// each. CR-23 pins this contract at the bus layer so an
2932    /// inadvertent regression at the SDK or adapter wrapper layer
2933    /// can be caught without an integration setup.
2934    #[tokio::test]
2935    async fn cr23_shutdown_invokes_adapter_flush_and_shutdown_exactly_once() {
2936        struct RecordingAdapter {
2937            on_batch_calls: Arc<AtomicU64>,
2938            flush_calls: Arc<AtomicU64>,
2939            shutdown_calls: Arc<AtomicU64>,
2940        }
2941
2942        #[async_trait::async_trait]
2943        impl crate::adapter::Adapter for RecordingAdapter {
2944            async fn init(&mut self) -> Result<(), AdapterError> {
2945                Ok(())
2946            }
2947            async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2948                self.on_batch_calls.fetch_add(1, AtomicOrdering::SeqCst);
2949                Ok(())
2950            }
2951            async fn flush(&self) -> Result<(), AdapterError> {
2952                self.flush_calls.fetch_add(1, AtomicOrdering::SeqCst);
2953                Ok(())
2954            }
2955            async fn shutdown(&self) -> Result<(), AdapterError> {
2956                self.shutdown_calls.fetch_add(1, AtomicOrdering::SeqCst);
2957                Ok(())
2958            }
2959            async fn poll_shard(
2960                &self,
2961                _shard_id: u16,
2962                _from_id: Option<&str>,
2963                _limit: usize,
2964            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2965                Ok(crate::adapter::ShardPollResult::empty())
2966            }
2967            fn name(&self) -> &'static str {
2968                "cr23-recording"
2969            }
2970            async fn is_healthy(&self) -> bool {
2971                true
2972            }
2973        }
2974
2975        let on_batch = Arc::new(AtomicU64::new(0));
2976        let flush = Arc::new(AtomicU64::new(0));
2977        let shutdown = Arc::new(AtomicU64::new(0));
2978        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(RecordingAdapter {
2979            on_batch_calls: on_batch.clone(),
2980            flush_calls: flush.clone(),
2981            shutdown_calls: shutdown.clone(),
2982        });
2983
2984        let config = EventBusConfig::builder()
2985            .num_shards(2)
2986            .ring_buffer_capacity(1024)
2987            .build()
2988            .unwrap();
2989        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
2990
2991        // Drive a small burst so on_batch fires at least once —
2992        // pins that the adapter is wired up correctly. The
2993        // load-bearing assertions below are on flush and shutdown.
2994        for i in 0..16 {
2995            let _ = bus.ingest(Event::new(json!({"i": i})));
2996        }
2997
2998        // Pre-CR-23 a regression that elided one of these would
2999        // pass `shutdown_regression.rs::shutdown_runs_even_with_outstanding_event_stream`
3000        // because the memory adapter's flush/shutdown are no-ops.
3001        // Here the recording adapter makes the contract observable.
3002        bus.shutdown().await.unwrap();
3003
3004        assert!(
3005            on_batch.load(AtomicOrdering::SeqCst) > 0,
3006            "sanity: on_batch must have fired at least once"
3007        );
3008        assert_eq!(
3009            flush.load(AtomicOrdering::SeqCst),
3010            1,
3011            "CR-23 regression: shutdown MUST call adapter.flush() exactly once"
3012        );
3013        assert_eq!(
3014            shutdown.load(AtomicOrdering::SeqCst),
3015            1,
3016            "CR-23 regression: shutdown MUST call adapter.shutdown() exactly once"
3017        );
3018    }
3019
3020    /// CR-25: pin that a SECOND caller of `shutdown_via_ref` whose
3021    /// CAS loses (because a first caller already flipped the
3022    /// `shutdown` flag) and whose deadline elapses BEFORE the
3023    /// first caller sets `shutdown_completed=true` returns
3024    /// `AdapterError::Transient(_)` — NOT a silent `Ok(())`.
3025    ///
3026    /// Pre-CR-25 both branches returned `Ok`. A caller that lost
3027    /// the CAS race had no way to distinguish "first caller
3028    /// finished shutdown" from "deadline timed out mid-shutdown."
3029    /// Under a slow adapter (`adapter_timeout` default 30s >
3030    /// the 10s spin deadline), the second caller silently saw
3031    /// `Ok` while the bus was still mid-shutdown — letting
3032    /// subsequent code observe a partially-shut-down bus.
3033    ///
3034    /// We use a slow first caller (sleeps inside `flush()`)
3035    /// and override the spin deadline to a few ms so the test
3036    /// runs fast.
3037    #[tokio::test]
3038    async fn cr25_second_caller_returns_transient_when_deadline_elapses() {
3039        struct SlowFlushAdapter {
3040            // Block flush() for this long. The first caller
3041            // gets stuck here while the second caller's spin
3042            // deadline elapses.
3043            flush_delay: std::time::Duration,
3044        }
3045
3046        #[async_trait::async_trait]
3047        impl crate::adapter::Adapter for SlowFlushAdapter {
3048            async fn init(&mut self) -> Result<(), AdapterError> {
3049                Ok(())
3050            }
3051            async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
3052                Ok(())
3053            }
3054            async fn flush(&self) -> Result<(), AdapterError> {
3055                tokio::time::sleep(self.flush_delay).await;
3056                Ok(())
3057            }
3058            async fn shutdown(&self) -> Result<(), AdapterError> {
3059                Ok(())
3060            }
3061            async fn poll_shard(
3062                &self,
3063                _shard_id: u16,
3064                _from_id: Option<&str>,
3065                _limit: usize,
3066            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3067                Ok(crate::adapter::ShardPollResult::empty())
3068            }
3069            fn name(&self) -> &'static str {
3070                "cr25-slow-flush"
3071            }
3072            async fn is_healthy(&self) -> bool {
3073                true
3074            }
3075        }
3076
3077        // Cubic P2: serialize access to the global deadline
3078        // override so concurrent tests don't interfere. Hold the
3079        // guard until the override is reset to 0 below.
3080        let _override_guard = super::shutdown_deadline_override_lock().await;
3081
3082        // Override the second-caller spin deadline to a short
3083        // value so the test doesn't wall-clock-wait 10s. Production
3084        // builds use the 10s default (the cfg(test) override is
3085        // compiled out).
3086        super::set_shutdown_via_ref_spin_deadline_for_test(50);
3087
3088        let config = EventBusConfig::builder()
3089            .num_shards(2)
3090            .ring_buffer_capacity(1024)
3091            .adapter_timeout(std::time::Duration::from_secs(5))
3092            .build()
3093            .unwrap();
3094        // First caller's flush() sleeps 500ms — far longer than
3095        // the 50ms spin deadline.
3096        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(SlowFlushAdapter {
3097            flush_delay: std::time::Duration::from_millis(500),
3098        });
3099        let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
3100
3101        // Spawn the FIRST caller — it wins the CAS and proceeds
3102        // into the slow flush. We don't await it; we want it
3103        // running in parallel.
3104        let bus_first = Arc::clone(&bus);
3105        let first_caller = tokio::spawn(async move { bus_first.shutdown_via_ref().await });
3106
3107        // Cubic P2: poll `is_shutdown()` until the first caller
3108        // has set the flag, instead of a fixed sleep. This makes
3109        // the test scheduler-independent — we proceed as soon as
3110        // the first caller has won the CAS, regardless of how
3111        // tokio happens to schedule things. Bounded by a 1s
3112        // timeout so a regression that prevents `shutdown_via_ref`
3113        // from setting the flag doesn't hang the test.
3114        tokio::time::timeout(std::time::Duration::from_secs(1), async {
3115            while !bus.is_shutdown() {
3116                tokio::task::yield_now().await;
3117            }
3118        })
3119        .await
3120        .expect("first caller did not set shutdown flag within 1s");
3121
3122        // The second caller's CAS will fail; it enters the spin
3123        // and times out at 50ms.
3124        let start = std::time::Instant::now();
3125        let second_result = bus.shutdown_via_ref().await;
3126        let elapsed = start.elapsed();
3127
3128        // Reset the override for other tests, then drop the guard.
3129        super::set_shutdown_via_ref_spin_deadline_for_test(0);
3130
3131        // CR-25 contract: the second caller MUST get a Transient
3132        // error, not a silent Ok.
3133        let err = second_result.expect_err(
3134            "CR-25 regression: second caller MUST surface AdapterError::Transient \
3135             when its deadline elapses, NOT a silent Ok",
3136        );
3137        match err {
3138            AdapterError::Transient(msg) => {
3139                assert!(
3140                    msg.contains("deadline elapsed") || msg.contains("mid-shutdown"),
3141                    "error message must reference the deadline path; got: {msg}"
3142                );
3143            }
3144            other => panic!("expected Transient, got {:?}", other),
3145        }
3146
3147        // Sanity: the second caller's elapsed time is bounded by
3148        // the override (50ms) + scheduler slop. If this is
3149        // anywhere near the production 10s, the cfg(test)
3150        // override path broke.
3151        assert!(
3152            elapsed < std::time::Duration::from_secs(5),
3153            "second caller took {elapsed:?}; the cfg(test) deadline override \
3154             broke if this is near the 10s production default"
3155        );
3156
3157        // Wait for the first caller to finish. Even though it
3158        // took the slow path, the bus IS shutting down and will
3159        // eventually complete.
3160        let _ = first_caller.await.unwrap();
3161        assert!(bus.is_shutdown_completed());
3162    }
3163
3164    /// Regression: BUG_REPORT.md #6 — `shutdown()` must deliver every
3165    /// successfully-ingested event to the adapter before returning.
3166    /// Pins the broader durability contract that the
3167    /// `drain_finalize_ready` gate supports: the drain worker may not
3168    /// finalize until the in-flight wait completes.
3169    ///
3170    /// Tests across many shards with bursts large enough that the
3171    /// drain workers are mid-loop when shutdown begins.
3172    #[tokio::test]
3173    async fn shutdown_delivers_every_successful_ingest_to_adapter() {
3174        let received = Arc::new(AtomicU64::new(0));
3175        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3176            received: received.clone(),
3177        });
3178
3179        let config = EventBusConfig::builder()
3180            .num_shards(4)
3181            .ring_buffer_capacity(4096)
3182            .build()
3183            .unwrap();
3184        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3185
3186        // Drive a sizable burst across all shards. Capacity > burst so
3187        // we don't trip backpressure; every successful Ok must reach
3188        // `on_batch` before shutdown returns.
3189        let total = 10_000usize;
3190        let mut successes = 0u64;
3191        for i in 0..total {
3192            if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3193                successes += 1;
3194            }
3195        }
3196
3197        // Shutdown awaits drain workers; with the BUG_REPORT.md #6 fix
3198        // those workers wait on `drain_finalize_ready` after observing
3199        // `shutdown=true`, so any push the producer made before the
3200        // shutdown flag is guaranteed to be in the ring buffer when
3201        // the final sweep runs.
3202        bus.shutdown().await.unwrap();
3203
3204        let delivered = received.load(AtomicOrdering::SeqCst);
3205        assert_eq!(
3206            delivered, successes,
3207            "shutdown stranded events: {successes} ingested successfully, \
3208             only {delivered} reached the adapter"
3209        );
3210    }
3211
3212    /// Regression: BUG_REPORT.md #16 — `flush()` must be a delivery
3213    /// barrier: after it returns successfully, every event the
3214    /// caller successfully ingested before `flush()` was called
3215    /// must have been handed to the adapter via `on_batch`.
3216    /// The previous implementation slept a single `batch.max_delay`
3217    /// after the ring buffers drained, which left a window where
3218    /// events could still be sitting in the per-shard mpsc channel
3219    /// or inside a partially-filled batch awaiting timeout — those
3220    /// events were silently dropped from the flush guarantee.
3221    #[tokio::test]
3222    async fn flush_is_a_delivery_barrier() {
3223        let received = Arc::new(AtomicU64::new(0));
3224        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3225            received: received.clone(),
3226        });
3227
3228        // Use a deliberately *long* batch.max_delay (250ms) so that a
3229        // partially-filled batch sitting in the batch worker's
3230        // pending state would survive past the old single-`max_delay`
3231        // sleep. min_size > burst forces the partial-batch path.
3232        let config = EventBusConfig::builder()
3233            .num_shards(2)
3234            .ring_buffer_capacity(1024)
3235            .batch(crate::config::BatchConfig {
3236                min_size: 1_000,
3237                max_size: 10_000,
3238                max_delay: Duration::from_millis(250),
3239                adaptive: false,
3240                velocity_window: Duration::from_millis(100),
3241            })
3242            .build()
3243            .unwrap();
3244        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3245
3246        // A small burst — far below `min_size`, so the batch worker
3247        // will sit on a partial batch waiting for `max_delay`.
3248        let burst = 50usize;
3249        let mut successes = 0u64;
3250        for i in 0..burst {
3251            if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3252                successes += 1;
3253            }
3254        }
3255
3256        // Time the flush call to confirm we waited long enough for
3257        // the partial batch to time out. The previous code slept
3258        // ~10ms total in the post-empty phase; the fix waits up to
3259        // `max_delay * num_workers` (here 500ms cap, capped at 2s).
3260        let t0 = std::time::Instant::now();
3261        bus.flush().await.unwrap();
3262        let elapsed = t0.elapsed();
3263
3264        // After flush returns, every successful ingest must have
3265        // been delivered to the adapter. With the old code this
3266        // assertion would fail: events sit in the partial batch
3267        // until `max_delay` (250ms) elapses, but flush returned
3268        // after only ~10ms.
3269        let delivered = received.load(AtomicOrdering::SeqCst);
3270        assert_eq!(
3271            delivered, successes,
3272            "flush() returned but only {delivered} of {successes} \
3273             events reached the adapter (#16); flush waited {:?}",
3274            elapsed
3275        );
3276
3277        bus.shutdown().await.unwrap();
3278    }
3279
3280    /// Regression (Phase 1): when configured with a
3281    /// persistent `producer_nonce_path`, two bus instances launched
3282    /// against the same path stamp the SAME nonce on every emitted
3283    /// batch. JetStream / Redis adapters key dedup on this nonce, so
3284    /// a producer that crashed mid-batch and restarted (within the
3285    /// backend's dedup window) issues retries with the same msg-ids
3286    /// and the backend correctly recognizes them as duplicates.
3287    ///
3288    /// Pre-fix the per-process nonce regenerated on every startup,
3289    /// so post-crash retries wrote NEW msg-ids and the backend
3290    /// persisted the partial-batch's accepted half twice.
3291    #[tokio::test]
3292    async fn persistent_producer_nonce_survives_bus_restart() {
3293        // Use a per-test temp file so concurrent runs don't collide.
3294        let mut nonce_path = std::env::temp_dir();
3295        let pid = std::process::id();
3296        let nanos = std::time::SystemTime::now()
3297            .duration_since(std::time::UNIX_EPOCH)
3298            .map(|d| d.as_nanos())
3299            .unwrap_or(0);
3300        nonce_path.push(format!("net-test-bus-nonce-{pid}-{nanos}"));
3301
3302        let make_config = |path: &std::path::Path| {
3303            EventBusConfig::builder()
3304                .num_shards(1)
3305                .ring_buffer_capacity(1024)
3306                .producer_nonce_path(path)
3307                .build()
3308                .unwrap()
3309        };
3310
3311        // First bus: ingest one event. Read its nonce off the
3312        // adapter-bound batch via a recording adapter.
3313        struct NonceRecordingAdapter {
3314            nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3315        }
3316        #[async_trait::async_trait]
3317        impl crate::adapter::Adapter for NonceRecordingAdapter {
3318            async fn init(&mut self) -> Result<(), AdapterError> {
3319                Ok(())
3320            }
3321            async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3322                *self.nonce.lock() = Some(batch.process_nonce);
3323                Ok(())
3324            }
3325            async fn flush(&self) -> Result<(), AdapterError> {
3326                Ok(())
3327            }
3328            async fn shutdown(&self) -> Result<(), AdapterError> {
3329                Ok(())
3330            }
3331            async fn poll_shard(
3332                &self,
3333                _: u16,
3334                _: Option<&str>,
3335                _: usize,
3336            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3337                Ok(crate::adapter::ShardPollResult::empty())
3338            }
3339            fn name(&self) -> &'static str {
3340                "nonce-recording"
3341            }
3342        }
3343
3344        let nonce_first_run = Arc::new(parking_lot::Mutex::new(None));
3345        {
3346            let bus = EventBus::new_with_adapter(
3347                make_config(&nonce_path),
3348                Box::new(NonceRecordingAdapter {
3349                    nonce: nonce_first_run.clone(),
3350                }),
3351            )
3352            .await
3353            .unwrap();
3354            bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3355            bus.flush().await.unwrap();
3356            bus.shutdown().await.unwrap();
3357        }
3358
3359        let nonce_second_run = Arc::new(parking_lot::Mutex::new(None));
3360        {
3361            let bus = EventBus::new_with_adapter(
3362                make_config(&nonce_path),
3363                Box::new(NonceRecordingAdapter {
3364                    nonce: nonce_second_run.clone(),
3365                }),
3366            )
3367            .await
3368            .unwrap();
3369            bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3370            bus.flush().await.unwrap();
3371            bus.shutdown().await.unwrap();
3372        }
3373
3374        let n_a = nonce_first_run
3375            .lock()
3376            .expect("first bus must have dispatched a batch");
3377        let n_b = nonce_second_run
3378            .lock()
3379            .expect("second bus must have dispatched a batch");
3380        assert_eq!(
3381            n_a, n_b,
3382            "two bus instances against the same producer_nonce_path \
3383             must stamp the same nonce — pre-fix this regenerated on \
3384             every restart and JetStream's dedup window saw new \
3385             msg-ids as fresh batches",
3386        );
3387
3388        // Cleanup.
3389        let _ = std::fs::remove_file(&nonce_path);
3390    }
3391
3392    /// Pin that ALL spawn sites — both the static initial-shard
3393    /// loop in `new_with_adapter` and the dynamic-add path in
3394    /// `add_shard_internal` — clone the bus's loaded
3395    /// `producer_nonce` correctly. Pre-#56 there was no nonce
3396    /// concept at the bus layer; if any future refactor drops the
3397    /// `producer_nonce: self.producer_nonce` line from one of the
3398    /// spawn sites (or stops loading the persistent path), the
3399    /// post-scale-up shard's batches would carry a different nonce
3400    /// and JetStream's cross-restart dedup would silently break for
3401    /// events ingested into the dynamic shard. Pin all observed
3402    /// batches across the static + dynamic shards share the bus's
3403    /// nonce.
3404    #[tokio::test]
3405    async fn multi_shard_bus_stamps_consistent_nonce_across_static_and_dynamic_shards() {
3406        let mut nonce_path = std::env::temp_dir();
3407        let pid = std::process::id();
3408        let nanos = std::time::SystemTime::now()
3409            .duration_since(std::time::UNIX_EPOCH)
3410            .map(|d| d.as_nanos())
3411            .unwrap_or(0);
3412        nonce_path.push(format!("net-test-multi-shard-nonce-{pid}-{nanos}"));
3413
3414        struct CollectingAdapter {
3415            nonces: Arc<parking_lot::Mutex<Vec<u64>>>,
3416        }
3417        #[async_trait::async_trait]
3418        impl crate::adapter::Adapter for CollectingAdapter {
3419            async fn init(&mut self) -> Result<(), AdapterError> {
3420                Ok(())
3421            }
3422            async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3423                self.nonces.lock().push(batch.process_nonce);
3424                Ok(())
3425            }
3426            async fn flush(&self) -> Result<(), AdapterError> {
3427                Ok(())
3428            }
3429            async fn shutdown(&self) -> Result<(), AdapterError> {
3430                Ok(())
3431            }
3432            async fn poll_shard(
3433                &self,
3434                _: u16,
3435                _: Option<&str>,
3436                _: usize,
3437            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3438                Ok(crate::adapter::ShardPollResult::empty())
3439            }
3440            fn name(&self) -> &'static str {
3441                "collecting"
3442            }
3443        }
3444
3445        let nonces = Arc::new(parking_lot::Mutex::new(Vec::new()));
3446        let policy = ScalingPolicy {
3447            min_shards: 1,
3448            max_shards: 8,
3449            cooldown: Duration::from_nanos(1),
3450            ..Default::default()
3451        };
3452        let config = EventBusConfig::builder()
3453            .num_shards(2)
3454            .ring_buffer_capacity(1024)
3455            .scaling(policy)
3456            .producer_nonce_path(&nonce_path)
3457            .build()
3458            .unwrap();
3459
3460        let bus = EventBus::new_with_adapter(
3461            config,
3462            Box::new(CollectingAdapter {
3463                nonces: nonces.clone(),
3464            }),
3465        )
3466        .await
3467        .unwrap();
3468
3469        // Drive the two static shards.
3470        for i in 0..200u64 {
3471            let _ = bus.ingest(Event::new(json!({"i": i})));
3472        }
3473        bus.flush().await.unwrap();
3474
3475        // Add a dynamic shard and drive it too.
3476        let _ = bus.manual_scale_up(1).await.unwrap();
3477        for i in 200..400u64 {
3478            let _ = bus.ingest(Event::new(json!({"i": i})));
3479        }
3480        bus.flush().await.unwrap();
3481
3482        bus.shutdown().await.unwrap();
3483
3484        let observed = nonces.lock().clone();
3485        assert!(
3486            !observed.is_empty(),
3487            "expected the adapter to have observed at least one batch",
3488        );
3489        let first = observed[0];
3490        for (i, &n) in observed.iter().enumerate() {
3491            assert_eq!(
3492                n, first,
3493                "batch {i} stamped a different nonce ({n:#x}) than the first \
3494                 batch ({first:#x}) — at least one spawn site (initial-shard \
3495                 loop or `add_shard_internal`) failed to inherit the bus's \
3496                 producer_nonce",
3497            );
3498        }
3499
3500        let _ = std::fs::remove_file(&nonce_path);
3501    }
3502
3503    /// Regression guard for the `(process_nonce, shard_id,
3504    /// sequence_start, i)` JetStream / Redis msg-id construction.
3505    /// Within one bus instance, `sequence_start` per shard MUST be
3506    /// strictly monotonic across batches AND every two adjacent
3507    /// batches `n` and `n+1` from the same shard MUST satisfy
3508    /// `seq_start[n+1] == seq_start[n] + len(events[n])` — no gaps,
3509    /// no overlap. A regression here breaks the at-most-once dedup
3510    /// every persistent adapter relies on: gaps reuse a `(shard,
3511    /// seq, i)` tuple after the dedup window closes, and overlap
3512    /// silently overlays a later batch on an earlier one's slot.
3513    ///
3514    /// The cross-restart variant (persistent `next_sequence` across
3515    /// process boots) is feature-shaped, not a bug fix — without
3516    /// persistence, restart relies on `process_nonce` rotating to
3517    /// disjoin the msg-id namespace (pinned by
3518    /// `persistent_producer_nonce_survives_bus_restart`). This test
3519    /// pins the within-process invariant the persistent nonce
3520    /// builds on.
3521    #[tokio::test]
3522    async fn sequence_start_is_per_shard_monotonic_and_gap_free() {
3523        struct ShardSeqRecorder {
3524            batches: Arc<parking_lot::Mutex<Vec<(u16, u64, usize)>>>,
3525        }
3526        #[async_trait::async_trait]
3527        impl crate::adapter::Adapter for ShardSeqRecorder {
3528            async fn init(&mut self) -> Result<(), AdapterError> {
3529                Ok(())
3530            }
3531            async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3532                self.batches.lock().push((
3533                    batch.shard_id,
3534                    batch.sequence_start,
3535                    batch.events.len(),
3536                ));
3537                Ok(())
3538            }
3539            async fn flush(&self) -> Result<(), AdapterError> {
3540                Ok(())
3541            }
3542            async fn shutdown(&self) -> Result<(), AdapterError> {
3543                Ok(())
3544            }
3545            async fn poll_shard(
3546                &self,
3547                _: u16,
3548                _: Option<&str>,
3549                _: usize,
3550            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3551                Ok(crate::adapter::ShardPollResult::empty())
3552            }
3553            fn name(&self) -> &'static str {
3554                "shard-seq-recorder"
3555            }
3556        }
3557
3558        let batches = Arc::new(parking_lot::Mutex::new(Vec::new()));
3559        let config = EventBusConfig::builder()
3560            .num_shards(4)
3561            .ring_buffer_capacity(1024)
3562            .build()
3563            .unwrap();
3564        let bus = EventBus::new_with_adapter(
3565            config,
3566            Box::new(ShardSeqRecorder {
3567                batches: batches.clone(),
3568            }),
3569        )
3570        .await
3571        .unwrap();
3572
3573        // Three drive-then-flush rounds so each shard sees multiple
3574        // batches; the across-batch monotonicity invariant is what
3575        // we're pinning, so a single batch per shard wouldn't
3576        // exercise it.
3577        for round in 0..3u64 {
3578            for i in 0..200u64 {
3579                bus.ingest(Event::new(json!({"r": round, "i": i}))).unwrap();
3580            }
3581            bus.flush().await.unwrap();
3582        }
3583        bus.shutdown().await.unwrap();
3584
3585        let observed = batches.lock().clone();
3586        assert!(
3587            !observed.is_empty(),
3588            "expected the adapter to have observed at least one batch",
3589        );
3590
3591        // Bucket batches per shard, preserving dispatch order.
3592        let mut by_shard: std::collections::HashMap<u16, Vec<(u64, usize)>> =
3593            std::collections::HashMap::new();
3594        for (shard, seq_start, len) in observed {
3595            by_shard.entry(shard).or_default().push((seq_start, len));
3596        }
3597
3598        for (shard, runs) in &by_shard {
3599            assert!(
3600                !runs.is_empty(),
3601                "shard {shard}: must have at least one batch",
3602            );
3603            // First batch of every shard starts at the per-shard
3604            // zero — `BatchWorker::next_sequence` is initialized to
3605            // 0 and the first `flush()` reads it before the
3606            // `saturating_add`. A regression that lazily seeds
3607            // `next_sequence` from a non-zero source (e.g. wall
3608            // clock) would trip here.
3609            assert_eq!(
3610                runs[0].0, 0,
3611                "shard {shard}: first batch must start at sequence 0, got {}",
3612                runs[0].0,
3613            );
3614            for window in runs.windows(2) {
3615                let (prev_start, prev_len) = window[0];
3616                let (next_start, _next_len) = window[1];
3617                let expected_next = prev_start
3618                    .checked_add(prev_len as u64)
3619                    .expect("test bounds keep us well below u64::MAX");
3620                assert!(
3621                    next_start > prev_start,
3622                    "shard {shard}: sequence_start must be strictly monotonic; \
3623                     saw {prev_start} → {next_start}",
3624                );
3625                assert_eq!(
3626                    next_start, expected_next,
3627                    "shard {shard}: gap or overlap in sequence_start; \
3628                     prev=({prev_start}, len={prev_len}) → next={next_start}, \
3629                     expected {expected_next}. A gap reuses (shard, seq, i) \
3630                     tuples after the JetStream/Redis dedup window closes; an \
3631                     overlap silently overlays a later batch on an earlier \
3632                     one's slot.",
3633                );
3634            }
3635        }
3636    }
3637
3638    /// Pin the within-process caching contract for the fallback
3639    /// (no-`producer_nonce_path`) path: two bus instances created
3640    /// in the same process see the SAME `batch_process_nonce()`
3641    /// because the helper is `OnceLock`-cached. The
3642    /// "different-across-restarts" semantic is a *process-level*
3643    /// guarantee — restart the process to get a fresh nonce — and
3644    /// is pinned by `persistent_producer_nonce_survives_bus_restart`
3645    /// (which uses a path; the without-path branch of #56 has no
3646    /// cross-restart guarantee by design).
3647    ///
3648    /// Cubic-ai P3: this test was previously named
3649    /// `process_nonce_fallback_differs_across_bus_instances`, which
3650    /// contradicted its own assertion (`assert_eq!(n_a, n_b)`).
3651    /// Renamed to match what it actually pins.
3652    #[tokio::test]
3653    async fn process_nonce_fallback_is_cached_within_process() {
3654        struct NonceRecordingAdapter {
3655            nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3656        }
3657        #[async_trait::async_trait]
3658        impl crate::adapter::Adapter for NonceRecordingAdapter {
3659            async fn init(&mut self) -> Result<(), AdapterError> {
3660                Ok(())
3661            }
3662            async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3663                *self.nonce.lock() = Some(batch.process_nonce);
3664                Ok(())
3665            }
3666            async fn flush(&self) -> Result<(), AdapterError> {
3667                Ok(())
3668            }
3669            async fn shutdown(&self) -> Result<(), AdapterError> {
3670                Ok(())
3671            }
3672            async fn poll_shard(
3673                &self,
3674                _: u16,
3675                _: Option<&str>,
3676                _: usize,
3677            ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3678                Ok(crate::adapter::ShardPollResult::empty())
3679            }
3680            fn name(&self) -> &'static str {
3681                "nonce-recording"
3682            }
3683        }
3684
3685        let cfg = || {
3686            EventBusConfig::builder()
3687                .num_shards(1)
3688                .ring_buffer_capacity(1024)
3689                .build()
3690                .unwrap()
3691        };
3692
3693        let n_a = Arc::new(parking_lot::Mutex::new(None));
3694        let n_b = Arc::new(parking_lot::Mutex::new(None));
3695        {
3696            let bus = EventBus::new_with_adapter(
3697                cfg(),
3698                Box::new(NonceRecordingAdapter { nonce: n_a.clone() }),
3699            )
3700            .await
3701            .unwrap();
3702            bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3703            bus.flush().await.unwrap();
3704            bus.shutdown().await.unwrap();
3705        }
3706        {
3707            let bus = EventBus::new_with_adapter(
3708                cfg(),
3709                Box::new(NonceRecordingAdapter { nonce: n_b.clone() }),
3710            )
3711            .await
3712            .unwrap();
3713            bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3714            bus.flush().await.unwrap();
3715            bus.shutdown().await.unwrap();
3716        }
3717
3718        // Note: in a single-process test BOTH bus instances see the
3719        // same `OnceLock`-cached `batch_process_nonce`, so the
3720        // nonces ARE equal here even though the documented
3721        // semantic is "fresh per process." This test pins the
3722        // cached-within-a-process invariant; the across-PROCESSES
3723        // semantic is exercised by the persistent-nonce test
3724        // above (which is the actually-load-bearing path for the
3725        // persistent-nonce fix).
3726        let n_a = n_a.lock().unwrap();
3727        let n_b = n_b.lock().unwrap();
3728        assert_eq!(
3729            n_a, n_b,
3730            "within one process, batch_process_nonce is OnceLock-cached \
3731             so two bus instances see the same nonce — the \
3732             different-across-restarts contract is process-level, \
3733             pinned via `persistent_producer_nonce_survives_bus_restart`",
3734        );
3735    }
3736
3737    /// Regression: `EventBusStats::batches_dispatched`
3738    /// (and the new `events_dispatched`) must actually increment on
3739    /// every successful adapter dispatch. Pre-fix `batches_dispatched`
3740    /// was declared but never updated, so flush()'s Phase 2 progress
3741    /// gate was constant-zero and early-broke after one window —
3742    /// flake on Windows-class timer resolution. Pin both counters
3743    /// directly here so a future refactor that drops the increment
3744    /// fails this test, not the timing-dependent
3745    /// `flush_is_a_delivery_barrier`.
3746    #[tokio::test]
3747    async fn dispatch_increments_bus_level_event_and_batch_counters() {
3748        let received = Arc::new(AtomicU64::new(0));
3749        let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3750            received: received.clone(),
3751        });
3752
3753        let config = EventBusConfig::builder()
3754            .num_shards(2)
3755            .ring_buffer_capacity(1024)
3756            .batch(crate::config::BatchConfig {
3757                min_size: 1,
3758                max_size: 10,
3759                max_delay: Duration::from_millis(10),
3760                adaptive: false,
3761                velocity_window: Duration::from_millis(100),
3762            })
3763            .build()
3764            .unwrap();
3765        let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3766
3767        for i in 0..50 {
3768            bus.ingest(Event::new(json!({"i": i}))).unwrap();
3769        }
3770        bus.flush().await.unwrap();
3771
3772        let batches = bus.stats().batches_dispatched.load(AtomicOrdering::Acquire);
3773        let events = bus.stats().events_dispatched.load(AtomicOrdering::Acquire);
3774        assert!(
3775            batches > 0,
3776            "batches_dispatched must be > 0 after flush — pre-fix it was \
3777             never incremented anywhere, breaking flush()'s Phase 2 progress gate",
3778        );
3779        assert_eq!(
3780            events, 50,
3781            "events_dispatched must equal the number of events handed to \
3782             the adapter (got {events}, expected 50)",
3783        );
3784
3785        bus.shutdown().await.unwrap();
3786    }
3787
3788    /// Regression: BUG_REPORT.md #6 — drop-without-shutdown must
3789    /// still release the drain-finalize gate so detached drain
3790    /// workers can exit instead of parking on the gate until the
3791    /// internal `DRAIN_FINALIZE_TIMEOUT` deadline. Pinning this
3792    /// keeps the `Drop` impl honest if someone refactors the
3793    /// shutdown gates later.
3794    #[tokio::test]
3795    async fn drop_releases_drain_finalize_gate_promptly() {
3796        let config = EventBusConfig::builder()
3797            .num_shards(2)
3798            .ring_buffer_capacity(1024)
3799            .build()
3800            .unwrap();
3801        let bus = EventBus::new(config).await.unwrap();
3802        let drain_gate = bus.drain_finalize_ready.clone();
3803
3804        // Drop without an awaited shutdown.
3805        drop(bus);
3806
3807        // The Drop impl must have set the gate. `DRAIN_FINALIZE_TIMEOUT`
3808        // is 10s; if Drop didn't flip the gate, drain workers would
3809        // park for up to that long before exiting.
3810        assert!(
3811            drain_gate.load(AtomicOrdering::Acquire),
3812            "Drop must release `drain_finalize_ready` so detached drain \
3813             workers exit promptly"
3814        );
3815    }
3816}