net/bus.rs
1//! Main EventBus facade.
2//!
3//! The EventBus provides a unified API for:
4//! - Event ingestion (non-blocking)
5//! - Event consumption (async polling with filtering)
6//! - Lifecycle management
7
8use crossbeam_utils::CachePadded;
9use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
10use std::sync::Arc;
11use std::time::Duration;
12
13use tokio::sync::mpsc;
14use tokio::task::JoinHandle;
15
16use crate::adapter::{Adapter, NoopAdapter};
17use crate::config::{AdapterConfig, BatchConfig, EventBusConfig};
18use crate::consumer::{ConsumeRequest, ConsumeResponse, PollMerger};
19use crate::error::{AdapterError, ConsumerError, IngestionError, IngestionResult};
20use crate::event::{Batch, Event, RawEvent};
21use crate::shard::{BatchWorker, ScalingDecision, ShardManager, ShardMetrics};
22
23#[cfg(feature = "jetstream")]
24use crate::adapter::JetStreamAdapter;
25#[cfg(feature = "net")]
26use crate::adapter::NetAdapter;
27#[cfg(feature = "redis")]
28use crate::adapter::RedisAdapter;
29
30/// The main event bus.
31///
32/// # Example
33///
34/// ```rust,ignore
35/// use net::{EventBus, EventBusConfig, Event};
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let bus = EventBus::new(EventBusConfig::default()).await?;
40///
41/// // Ingest events
42/// bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
43///
44/// // Poll events
45/// let response = bus.poll(ConsumeRequest::new(100)).await?;
46///
47/// bus.shutdown().await?;
48/// Ok(())
49/// }
50/// ```
51pub struct EventBus {
52 /// Shard manager for parallel ingestion.
53 shard_manager: Arc<ShardManager>,
54 /// Adapter for durable storage.
55 adapter: Arc<dyn Adapter>,
56 /// Poll merger for cross-shard consumption.
57 poll_merger: arc_swap::ArcSwap<PollMerger>,
58 /// Serializes the `shard_manager.shard_ids() → poll_merger.store`
59 /// block in `add_shard_internal` / `remove_shard_internal`.
60 /// Without this lock, two callers (e.g. scaling-monitor
61 /// add_shard racing manual_scale_down's remove_shard) read the
62 /// shard ids snapshot at slightly different points and then
63 /// race on the `arc_swap.store`. The write that lands second
64 /// can clobber the more-current view: T1 reads `{0..5}`, T2
65 /// reads `{1..4}`, T2 stores `{1..4}`, T1 stores `{0..5}` —
66 /// the published merger then routes polls to the just-removed
67 /// shard 0 until the next topology change repairs the
68 /// snapshot.
69 poll_merger_swap_lock: parking_lot::Mutex<()>,
70 /// Per-shard worker handles. Stored separately so shutdown can
71 /// await drain workers *before* batch workers — the drain
72 /// worker's final sweep races the batch worker's exit
73 /// otherwise, and any events the drain worker pushes to the
74 /// channel after the batch worker has stopped reading are
75 /// silently lost.
76 batch_workers: parking_lot::Mutex<std::collections::HashMap<u16, ShardWorkers>>,
77 /// Channels for sending batches to workers (shard_id -> sender).
78 batch_senders: parking_lot::RwLock<
79 std::collections::HashMap<u16, mpsc::Sender<Vec<crate::event::InternalEvent>>>,
80 >,
81 /// Shutdown flag.
82 shutdown: Arc<AtomicBool>,
83 /// Gate signaling drain workers that the in-flight wait has
84 /// completed and they may safely run their final ring-buffer
85 /// sweep. Distinct from `shutdown` because the drain worker
86 /// observing `shutdown=true` alone is not enough: a producer
87 /// that read `shutdown=false` may still be mid-push, and if the
88 /// drain worker rushes through its final sweep before that push
89 /// is visible the event is stranded. `shutdown()` sets this
90 /// after waiting for `in_flight_ingests==0`, at which point the
91 /// Acquire load on the drain side synchronizes-with the Release
92 /// store here, transitively chaining through the SeqCst
93 /// in-flight handshake to make every observed-pre-shutdown push
94 /// visible to the drain worker's subsequent `pop_batch_into`.
95 drain_finalize_ready: Arc<AtomicBool>,
96 /// In-flight ingest counter. Incremented before each ingest's
97 /// shutdown check and decremented after the push completes (or
98 /// bails). `shutdown()` waits for this to drop to zero *after*
99 /// setting `shutdown=true` and *before* setting
100 /// `drain_finalize_ready=true` so no producer is mid-push when
101 /// the drain workers do their final sweep — closing the race
102 /// where a producer that observed `shutdown=false` could push
103 /// *after* the drain worker's last `pop_batch_into` returned
104 /// zero, leaving the event stranded in the ring buffer.
105 /// Pre-fix this was `AtomicU32`. A 4-billion-in-flight
106 /// wrap is not realistic in production, but the counter
107 /// participates in the shutdown protocol — a wrap to 0 would
108 /// trick the wait-for-zero loop into thinking shutdown was
109 /// safe to proceed while producers were still mid-push.
110 /// Widened to `AtomicU64` so the wrap is astronomical
111 /// (1.8e19 in-flight ingests).
112 ///
113 /// **PERF_AUDIT §1.1** — Pre-fix this was a single shared
114 /// `AtomicU64`, and every `ingest`/`ingest_raw` did a SeqCst
115 /// `fetch_add` + a SeqCst `shutdown.load` + (on drop) a SeqCst
116 /// `fetch_sub` against it. On a multi-producer bus the cache
117 /// line ping-ponged across cores on every event, re-serializing
118 /// what was otherwise a fully sharded path. Now striped across
119 /// 32 cache-padded slots; producers pick one at thread-local
120 /// sticky affinity, and the cold reader (`shutdown` wait-for-
121 /// zero) sums slots.
122 in_flight_ingests: StripedInFlight,
123 /// Set to `true` after `shutdown()` runs to completion. `Drop`
124 /// uses this to detect "dropped without an awaited shutdown" —
125 /// in that case events still in the ring buffers / mpsc channels
126 /// are silently lost (see `Drop` impl).
127 shutdown_completed: AtomicBool,
128 /// Configuration.
129 config: EventBusConfig,
130 /// Statistics.
131 stats: Arc<EventBusStats>,
132 /// Producer nonce. Loaded from
133 /// `config.producer_nonce_path` on startup when the path is
134 /// configured; otherwise falls back to the per-process default
135 /// from `event::batch_process_nonce`. Stamped on every batch
136 /// the bus emits — the worker spawn copies this u64 into
137 /// `BatchWorkerParams::producer_nonce`, and
138 /// `remove_shard_internal`'s stranded-flush uses it via
139 /// `Batch::with_nonce`.
140 producer_nonce: u64,
141 /// Scaling monitor task handle.
142 scaling_monitor: parking_lot::Mutex<Option<JoinHandle<()>>>,
143}
144
145/// Worker handles for a single shard. The drain worker pumps
146/// events from the ring buffer into an mpsc channel; the batch
147/// worker reads from that channel and dispatches to the adapter.
148/// Shutdown ordering is load-bearing — see `EventBus::shutdown`.
149struct ShardWorkers {
150 batch: JoinHandle<()>,
151 drain: JoinHandle<()>,
152 /// Bus-owned mirror of the BatchWorker's `next_sequence`.
153 /// `remove_shard_internal` reads this AFTER awaiting `batch`
154 /// to learn the worker's final post-flush sequence, then uses
155 /// it as the `sequence_start` for the stranded-ring-buffer
156 /// flush so the stranded msg-ids fall strictly past every
157 /// msg-id the worker emitted — without this, JetStream dedup
158 /// would silently drop the stranded batch when both used 0.
159 next_sequence: Arc<AtomicU64>,
160}
161
162/// RAII guard for an in-flight ingest. Decrements
163/// `in_flight_ingests` on drop so `shutdown()` can wait for the
164/// counter to reach zero. Holds the slot index the matching
165/// `fetch_add` hit so the matching `fetch_sub` lands on the same
166/// slot — sub'ing from a different slot would still net-zero the
167/// sum but would skew slot occupancies under churn, eventually
168/// underflowing one slot to u64::MAX.
169struct IngestGuard<'a> {
170 bus: &'a EventBus,
171 slot: usize,
172}
173
174impl Drop for IngestGuard<'_> {
175 fn drop(&mut self) {
176 self.bus.in_flight_ingests.fetch_sub_at(self.slot, 1);
177 }
178}
179
180/// Number of cache-padded slots the in-flight counter stripes
181/// across. 32 is large enough to absorb typical core counts
182/// without collisions; small enough that the cold `sum()` path
183/// stays cheap. Power-of-two so slot selection is a mask.
184const IN_FLIGHT_STRIPE_SLOTS: usize = 32;
185
186/// Striped, cache-padded `in_flight_ingests` counter. Per
187/// PERF_AUDIT §1.1 — splits the single contended `AtomicU64` into
188/// 32 cache-padded slots so each producer hits its own line.
189/// Shutdown's wait-for-zero loop sums the slots.
190struct StripedInFlight {
191 slots: [CachePadded<AtomicU64>; IN_FLIGHT_STRIPE_SLOTS],
192}
193
194impl StripedInFlight {
195 fn new() -> Self {
196 Self {
197 slots: std::array::from_fn(|_| CachePadded::new(AtomicU64::new(0))),
198 }
199 }
200
201 /// Pick a slot for this thread. Sticky per-thread via a
202 /// `thread_local!` cell so repeat calls from the same
203 /// producer thread hit the same cache line; first call
204 /// derives an index from `thread::current().id()`.
205 #[inline(always)]
206 fn slot_for_current_thread(&self) -> usize {
207 thread_local! {
208 static SLOT: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
209 }
210 SLOT.with(|s| {
211 if let Some(idx) = s.get() {
212 return idx;
213 }
214 use std::collections::hash_map::DefaultHasher;
215 use std::hash::{Hash, Hasher};
216 let mut h = DefaultHasher::new();
217 std::thread::current().id().hash(&mut h);
218 let idx = (h.finish() as usize) & (IN_FLIGHT_STRIPE_SLOTS - 1);
219 s.set(Some(idx));
220 idx
221 })
222 }
223
224 #[inline(always)]
225 fn fetch_add_at(&self, slot: usize, n: u64) {
226 self.slots[slot].fetch_add(n, AtomicOrdering::SeqCst);
227 }
228
229 #[inline(always)]
230 fn fetch_sub_at(&self, slot: usize, n: u64) {
231 self.slots[slot].fetch_sub(n, AtomicOrdering::SeqCst);
232 }
233
234 /// Sum across all slots with SeqCst loads. Cold path —
235 /// shutdown's wait-for-zero + reconciliation only.
236 fn sum(&self) -> u64 {
237 self.slots
238 .iter()
239 .map(|s| s.load(AtomicOrdering::SeqCst))
240 .sum()
241 }
242}
243
244/// Event bus statistics.
245///
246/// **PERF_AUDIT §1.2** — Counters are `CachePadded` so the
247/// producer-hot `events_ingested`/`events_dropped` don't false-
248/// share with the batch-worker-hot
249/// `batches_dispatched`/`events_dispatched`. Pre-fix all four sat
250/// on 1-2 cache lines and every per-event `fetch_add` ping-ponged
251/// the line between ingesting producers and the batch worker.
252/// `CachePadded<AtomicU64>` derefs to `AtomicU64`, so external
253/// callers using `stats.events_ingested.load(...)` work unchanged.
254#[derive(Debug, Default)]
255pub struct EventBusStats {
256 /// Total events ingested.
257 pub events_ingested: CachePadded<AtomicU64>,
258 /// Events dropped due to backpressure.
259 pub events_dropped: CachePadded<AtomicU64>,
260 /// Batches dispatched to adapter.
261 ///
262 /// Pre-fix this field was declared but never incremented anywhere
263 /// — `flush()`'s Phase 2 progress probe (`bus.rs:815`) read it as
264 /// "did the BatchWorker make progress this `max_delay` window?",
265 /// observed `0 == 0`, and always early-broke after a single
266 /// window. On Windows-class timer resolution the race against the
267 /// BatchWorker's first `recv_timeout` tipped the wrong way for
268 /// `flush_is_a_delivery_barrier` regularly. Now incremented in
269 /// the BatchWorker spawn (after a successful `dispatch_batch`)
270 /// and in `remove_shard_internal`'s stranded-flush.
271 pub batches_dispatched: CachePadded<AtomicU64>,
272 /// Total events dispatched to the adapter (sum of batch lengths
273 /// from successful `on_batch` calls). Companion to
274 /// `batches_dispatched` — by the time `flush()` returns,
275 /// `events_dispatched + events_dropped == events_ingested`. FFI
276 /// consumers can also use this to monitor end-to-end delivery.
277 pub events_dispatched: CachePadded<AtomicU64>,
278 /// Set to `true` if `shutdown()` / `shutdown_via_ref()` had to
279 /// proceed past the in-flight-ingest grace deadline (5 s) with
280 /// producers still mid-push. The stranded count is counted into
281 /// `events_dropped` (already; pre-fix), but `shutdown` returns
282 /// `Ok(())` regardless — callers that need to distinguish
283 /// "clean shutdown" from "lossy shutdown" check this flag in
284 /// `bus.stats()` afterward (only meaningful for the
285 /// `shutdown_via_ref` path that doesn't consume the bus).
286 ///
287 /// Pre-fix the warning + `events_dropped` increment
288 /// were the only signal; `Result<(), AdapterError>` returned
289 /// `Ok` indistinguishable from a clean shutdown.
290 pub shutdown_was_lossy: std::sync::atomic::AtomicBool,
291}
292
293impl EventBus {
294 /// Create a new event bus with the given configuration.
295 pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError> {
296 // Create adapter from config
297 let adapter: Box<dyn Adapter> = match &config.adapter {
298 AdapterConfig::Noop => Box::new(NoopAdapter::new()),
299 #[cfg(feature = "redis")]
300 AdapterConfig::Redis(redis_config) => {
301 Box::new(RedisAdapter::new(redis_config.clone())?)
302 }
303 #[cfg(feature = "jetstream")]
304 AdapterConfig::JetStream(js_config) => {
305 Box::new(JetStreamAdapter::new(js_config.clone())?)
306 }
307 #[cfg(feature = "net")]
308 AdapterConfig::Net(net_config) => Box::new(NetAdapter::new((**net_config).clone())?),
309 };
310
311 Self::new_with_adapter(config, adapter).await
312 }
313
314 /// Create a new event bus with a caller-supplied adapter.
315 ///
316 /// `config.adapter` is ignored — the supplied `adapter` is used
317 /// instead. Useful for tests that need to observe or inject
318 /// behavior at the adapter boundary (e.g. a counting adapter
319 /// for end-to-end delivery assertions, a flaky adapter for
320 /// retry-path coverage).
321 pub async fn new_with_adapter(
322 config: EventBusConfig,
323 mut adapter: Box<dyn Adapter>,
324 ) -> Result<Self, AdapterError> {
325 config
326 .validate()
327 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
328
329 // Initialize adapter (with timeout to prevent hanging on unreachable backends)
330 tokio::time::timeout(config.adapter_timeout, adapter.init())
331 .await
332 .map_err(|_| AdapterError::Fatal("adapter init timed out".into()))??;
333 let adapter: Arc<dyn Adapter> = Arc::from(adapter);
334
335 // Create shard manager (with or without dynamic scaling)
336 let shard_manager = if let Some(ref scaling_policy) = config.scaling {
337 Arc::new(
338 ShardManager::with_mapper(
339 config.num_shards,
340 config.ring_buffer_capacity,
341 config.backpressure_mode,
342 scaling_policy.clone(),
343 )
344 .map_err(|e| AdapterError::Fatal(e.to_string()))?,
345 )
346 } else {
347 Arc::new(ShardManager::new(
348 config.num_shards,
349 config.ring_buffer_capacity,
350 config.backpressure_mode,
351 ))
352 };
353
354 // Create poll merger.
355 //
356 // Pass the live id set rather than the count. At
357 // initial construction the ids are dense (`0..num_shards`),
358 // but using `shard_ids()` here keeps a single code path with
359 // the post-scaling re-stores below.
360 let poll_merger = arc_swap::ArcSwap::from_pointee(PollMerger::new(
361 adapter.clone(),
362 shard_manager.shard_ids(),
363 ));
364
365 // Shutdown flag and drain-finalize gate. See `drain_finalize_ready`
366 // doc on `EventBus` for the synchronization contract.
367 let shutdown = Arc::new(AtomicBool::new(false));
368 let drain_finalize_ready = Arc::new(AtomicBool::new(false));
369
370 // Stats are shared with every BatchWorker spawn so successful
371 // dispatches increment `batches_dispatched` / `events_dispatched`
372 // — `flush()`'s Phase 2 progress probe gates on those.
373 let stats = Arc::new(EventBusStats::default());
374
375 // Producer nonce. Persistent path → load-or-create
376 // the durable u64 so cross-process retries dedup against the
377 // prior incarnation. No path → per-process default (today's
378 // at-most-once-across-restart behavior).
379 let producer_nonce = match &config.producer_nonce_path {
380 Some(path) => crate::adapter::PersistentProducerNonce::load_or_create(path)
381 .map_err(|e| {
382 AdapterError::Fatal(format!(
383 "failed to load/create producer-nonce file at {}: {e}",
384 path.display(),
385 ))
386 })?
387 .nonce(),
388 None => crate::event::batch_process_nonce(),
389 };
390
391 // Create batch workers for each shard
392 let mut batch_workers: std::collections::HashMap<u16, ShardWorkers> =
393 std::collections::HashMap::with_capacity(config.num_shards as usize);
394 let mut batch_senders =
395 std::collections::HashMap::with_capacity(config.num_shards as usize);
396
397 for shard_id in 0..config.num_shards {
398 let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
399
400 let next_sequence = Arc::new(AtomicU64::new(0));
401
402 let batch = spawn_batch_worker(BatchWorkerParams {
403 shard_id,
404 rx,
405 adapter: adapter.clone(),
406 shard_manager: shard_manager.clone(),
407 config: config.batch.clone(),
408 adapter_timeout: config.adapter_timeout,
409 batch_retries: config.adapter_batch_retries,
410 next_sequence: next_sequence.clone(),
411 stats: stats.clone(),
412 producer_nonce,
413 });
414
415 let drain = spawn_drain_worker_for_shard(
416 shard_id,
417 shard_manager.clone(),
418 tx.clone(),
419 shutdown.clone(),
420 drain_finalize_ready.clone(),
421 );
422
423 batch_workers.insert(
424 shard_id,
425 ShardWorkers {
426 batch,
427 drain,
428 next_sequence,
429 },
430 );
431 batch_senders.insert(shard_id, tx);
432 }
433
434 let bus = Self {
435 shard_manager,
436 adapter,
437 poll_merger,
438 poll_merger_swap_lock: parking_lot::Mutex::new(()),
439 batch_workers: parking_lot::Mutex::new(batch_workers),
440 batch_senders: parking_lot::RwLock::new(batch_senders),
441 shutdown,
442 drain_finalize_ready,
443 in_flight_ingests: StripedInFlight::new(),
444 shutdown_completed: AtomicBool::new(false),
445 config,
446 stats,
447 producer_nonce,
448 scaling_monitor: parking_lot::Mutex::new(None),
449 };
450
451 Ok(bus)
452 }
453
454 /// Start the scaling monitor (if dynamic scaling is enabled).
455 /// This spawns a background task that periodically evaluates scaling decisions.
456 ///
457 /// The spawned task holds a `Weak<Self>` rather than a strong
458 /// `Arc<Self>` clone. With a strong clone the task kept the bus
459 /// alive forever, and `shutdown(self)` (which consumes by value)
460 /// was unreachable: callers with an `Arc<EventBus>` could not
461 /// `Arc::try_unwrap` to consume it because the spawned task always
462 /// held one of the strong refs.
463 ///
464 /// With a `Weak`, the monitor task upgrades each tick. Once the
465 /// last caller-held `Arc` is dropped, the upgrade fails and the
466 /// task exits cleanly. To shut down via `shutdown(self)`, the
467 /// caller must hold the only strong reference: `Arc::try_unwrap`
468 /// on the resulting bus succeeds because the spawned task only
469 /// holds a Weak.
470 pub fn start_scaling_monitor(self: &Arc<Self>) {
471 if self.config.scaling.is_none() {
472 return;
473 }
474
475 // Idempotency check: no-op when a monitor is already
476 // installed. Otherwise a second `start_scaling_monitor`
477 // call would overwrite the slot without aborting the
478 // previous `JoinHandle` — the displaced task would keep
479 // running, holding a `Weak<EventBus>`, only exiting when it
480 // next observed `shutdown` or failed to upgrade. Two
481 // concurrent monitors would briefly compete for
482 // `evaluate_scaling`'s lock, doubling the metrics-tick
483 // wakeup rate.
484 let mut slot = self.scaling_monitor.lock();
485 if slot.is_some() {
486 tracing::debug!("start_scaling_monitor: monitor already running, skipping");
487 return;
488 }
489
490 let weak = Arc::downgrade(self);
491 let handle = tokio::spawn(async move {
492 run_scaling_monitor_via_weak(weak).await;
493 });
494
495 *slot = Some(handle);
496 }
497
498 /// Internal: Add a new shard with its workers.
499 ///
500 /// The previous implementation called `shard_manager.add_shard()`
501 /// first, which atomically marked the shard `Active` and published
502 /// it to the routing table. So `select_shard` could route producer
503 /// pushes to the new id *before* any drain or batch worker existed,
504 /// leaving events queued in a buffer with no consumer (and
505 /// triggering the configured backpressure mode if the buffer
506 /// filled).
507 ///
508 /// The fix uses the two-phase API on `ShardManager`:
509 /// 1. `add_shard()` allocates the id and metrics collector,
510 /// adds the shard to the routing table in `Provisioning`
511 /// state — so `with_shard` works (which the drain worker
512 /// needs) but `select_shard` skips it.
513 /// 2. Spawn batch + drain workers and register the sender.
514 /// 3. `activate_shard()` flips state to `Active`. Only now
515 /// does `select_shard` start routing producer pushes.
516 async fn add_shard_internal(&self) -> Result<u16, AdapterError> {
517 self.add_shard_internal_with_cooldown_policy(false).await
518 }
519
520 /// Like [`add_shard_internal`] but bypasses the auto-scaling
521 /// cooldown. See [`ShardManager::add_shard_force`].
522 async fn add_shard_internal_force(&self) -> Result<u16, AdapterError> {
523 self.add_shard_internal_with_cooldown_policy(true).await
524 }
525
526 async fn add_shard_internal_with_cooldown_policy(
527 &self,
528 force: bool,
529 ) -> Result<u16, AdapterError> {
530 // Step 1: provisioning add — not yet selectable.
531 let new_id = if force {
532 self.shard_manager.add_shard_force()
533 } else {
534 self.shard_manager.add_shard()
535 }
536 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
537
538 // Step 2: spawn workers and register the sender.
539 let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
540
541 let next_sequence = Arc::new(AtomicU64::new(0));
542
543 let batch = spawn_batch_worker(BatchWorkerParams {
544 shard_id: new_id,
545 rx,
546 adapter: self.adapter.clone(),
547 shard_manager: self.shard_manager.clone(),
548 config: self.config.batch.clone(),
549 adapter_timeout: self.config.adapter_timeout,
550 batch_retries: self.config.adapter_batch_retries,
551 next_sequence: next_sequence.clone(),
552 stats: self.stats.clone(),
553 producer_nonce: self.producer_nonce,
554 });
555
556 // Order of publish here matters. Today `select_shard`
557 // filters on `state == Active` and a shard is still
558 // `Provisioning` at this point, so producers cannot route
559 // to it yet — but the pattern (insert sender → spawn drain
560 // worker → activate_shard) is ordering-fragile: a future
561 // refactor that flips `activate_shard` ahead of the drain-
562 // worker spawn would leave a window where producers can
563 // push but no drain worker is polling. Pin the order by
564 // installing the sender BEFORE the drain worker (existing
565 // behaviour) so the worker observes a fully-published
566 // sender at spawn time; `activate_shard` runs further down.
567 self.batch_senders.write().insert(new_id, tx.clone());
568
569 let drain = spawn_drain_worker_for_shard(
570 new_id,
571 self.shard_manager.clone(),
572 tx,
573 self.shutdown.clone(),
574 self.drain_finalize_ready.clone(),
575 );
576
577 self.batch_workers.lock().insert(
578 new_id,
579 ShardWorkers {
580 batch,
581 drain,
582 next_sequence,
583 },
584 );
585
586 // Step 3: workers are live — flip the shard to Active so
587 // `select_shard` will route to it.
588 //
589 // On `activate_shard` failure we mirror
590 // `remove_shard_internal`'s teardown: drop the sender,
591 // unmap the provisioning entry (which atomically pops any
592 // residual ring-buffer events), gracefully await both
593 // workers (so the drain worker's `scratch` Vec sends its
594 // contents on the channel and the batch worker's
595 // `current_batch` is flushed via the channel-close path),
596 // then dispatch any stranded ring-buffer events through
597 // the adapter. Pre-fix this used `.abort()` on both
598 // handles which dropped the drain
599 // worker's scratch and the batch worker's current_batch
600 // without dispatch.
601 if let Err(e) = self.shard_manager.activate_shard(new_id) {
602 tracing::warn!(
603 shard_id = new_id,
604 error = %e,
605 "activate_shard failed; rolling back provisioning state"
606 );
607
608 // 1. Drop the bus-side sender. The drain worker still
609 // holds its own clone, so the channel stays open
610 // until `with_shard` returns None (step 2) and the
611 // drain worker breaks out of its loop, dropping
612 // its sender and finally closing the channel.
613 self.batch_senders.write().remove(&new_id);
614
615 // 2. Atomically pop any ring-buffer residue and
616 // unmap the Provisioning entry. After this,
617 // `with_shard(new_id)` returns None and the drain
618 // worker exits at its next poll (after sending
619 // any events it had already popped into `scratch`
620 // on this iteration).
621 //
622 // For a brand-new Provisioning shard the buffer
623 // should be empty (`select_shard` skips
624 // non-Active states), so `stranded` is normally
625 // `Vec::new()`. The flush below is a defensive
626 // no-op on the happy path but covers any future
627 // code path that routes to a Provisioning shard
628 // or any race window that tucked an event in
629 // before `activate_shard` returned its error.
630 let stranded = self.shard_manager.remove_shard(new_id).unwrap_or_default();
631
632 // 3. Take ownership of the worker handles and await
633 // them gracefully. Order: drain first (it pumps
634 // its scratch + final-sweep contents into the
635 // channel and exits), then batch (which receives
636 // those events plus any prior channel residue,
637 // flushes its own `current_batch`, and exits).
638 //
639 // Awaiting in this order is what makes the drain
640 // worker's scratch reach the adapter — the
641 // drain's `Some(N>0)` arm `mem::replace`s scratch
642 // into a batch and `sender.send(batch).await`s
643 // it; that send must complete (or fail) before
644 // the drain worker breaks. The batch worker's
645 // `Ok(None)` arm then runs after both senders
646 // are dropped and flushes any pending batch.
647 // Bound each JoinHandle await so a worker that's
648 // parked inside a slow adapter call (e.g. drain worker
649 // mid-`sender.send().await` against a backpressured
650 // channel because the batch worker is itself blocked
651 // in the adapter) cannot pin rollback indefinitely.
652 // 2x `adapter_timeout` is the natural ceiling: the
653 // batch worker uses `adapter_timeout` per dispatch and
654 // is expected to wake within that window. A timeout
655 // here leaks the JoinHandle — acceptable because
656 // step 1 already removed the bus-side sender, so the
657 // detached task can't observe new work and will exit
658 // on its next loop iteration.
659 let workers = self.batch_workers.lock().remove(&new_id);
660 // `worker_detached` is set when either join times out
661 // — the worker is still running on a leaked
662 // JoinHandle. In that case the `next_sequence` atomic
663 // is no longer a reliable upper bound: the detached
664 // worker may publish a final batch whose msg-ids land
665 // in the same `[next_sequence..N]` range we'd use for
666 // the stranded-flush, producing duplicate XADDs or
667 // JetStream dedup hits. Skip the stranded-flush when
668 // detached and surface the loss explicitly so the
669 // operator sees it.
670 let mut worker_detached = false;
671 let final_next_sequence = if let Some(workers) = workers {
672 let bound = self.config.adapter_timeout.saturating_mul(2);
673 match tokio::time::timeout(bound, workers.drain).await {
674 Ok(Ok(())) => {}
675 Ok(Err(err)) => {
676 tracing::warn!(
677 shard_id = new_id,
678 error = %err,
679 "drain worker JoinHandle errored on activate-failure rollback"
680 );
681 }
682 Err(_) => {
683 tracing::warn!(
684 shard_id = new_id,
685 timeout_ms = bound.as_millis() as u64,
686 "drain worker did not exit within timeout on activate-failure rollback; detaching"
687 );
688 worker_detached = true;
689 }
690 }
691 match tokio::time::timeout(bound, workers.batch).await {
692 Ok(Ok(())) => {}
693 Ok(Err(err)) => {
694 tracing::warn!(
695 shard_id = new_id,
696 error = %err,
697 "BatchWorker JoinHandle errored on activate-failure rollback"
698 );
699 }
700 Err(_) => {
701 tracing::warn!(
702 shard_id = new_id,
703 timeout_ms = bound.as_millis() as u64,
704 "BatchWorker did not exit within timeout on activate-failure rollback; detaching"
705 );
706 worker_detached = true;
707 }
708 }
709 workers.next_sequence.load(AtomicOrdering::Acquire)
710 } else {
711 0
712 };
713
714 // 4. Dispatch any stranded events as a single-shot
715 // batch so they reach durable storage with the
716 // correct sequence-id segment. Identical to the
717 // `remove_shard_internal` teardown — but only when
718 // the worker actually exited. If it timed out and
719 // is still running on a leaked handle, dispatching
720 // here would emit msg-ids overlapping the worker's
721 // final flush; surface the events as dropped
722 // instead so the duplicate-on-the-wire hazard is
723 // avoided.
724 if !stranded.is_empty() && worker_detached {
725 let count = stranded.len();
726 self.stats
727 .events_dropped
728 .fetch_add(count as u64, AtomicOrdering::Relaxed);
729 self.stats
730 .shutdown_was_lossy
731 .store(true, AtomicOrdering::Release);
732 tracing::error!(
733 shard_id = new_id,
734 count,
735 "activate-failure rollback: skipping stranded-flush \
736 because a worker JoinHandle timed out and may still \
737 be running; events would collide with the detached \
738 worker's final batch on the wire. Counted as dropped."
739 );
740 } else if !stranded.is_empty() {
741 let count = stranded.len();
742 let batch = crate::event::Batch::with_nonce(
743 new_id,
744 stranded,
745 final_next_sequence,
746 self.producer_nonce,
747 );
748 let dispatched = dispatch_batch(
749 &*self.adapter,
750 Arc::new(batch),
751 new_id,
752 self.config.adapter_timeout,
753 self.config.adapter_batch_retries,
754 )
755 .await;
756 if dispatched {
757 self.stats
758 .batches_dispatched
759 .fetch_add(1, AtomicOrdering::Relaxed);
760 self.stats
761 .events_dispatched
762 .fetch_add(count as u64, AtomicOrdering::Relaxed);
763 tracing::info!(
764 shard_id = new_id,
765 count,
766 "activate-failure rollback: flushed stranded events to adapter",
767 );
768 } else {
769 tracing::error!(
770 shard_id = new_id,
771 count,
772 "activate-failure rollback: adapter rejected stranded events; \
773 events lost"
774 );
775 }
776 }
777
778 return Err(AdapterError::Fatal(e.to_string()));
779 }
780
781 // Update poll merger with the post-add id set. Hold
782 // `poll_merger_swap_lock` across the snapshot-and-store so a
783 // concurrent remove_shard can't sneak between our `shard_ids()`
784 // read and our `arc_swap.store` and clobber the published view
785 // with a stale snapshot.
786 {
787 let _swap_guard = self.poll_merger_swap_lock.lock();
788 self.poll_merger.store(Arc::new(PollMerger::new(
789 self.adapter.clone(),
790 self.shard_manager.shard_ids(),
791 )));
792 }
793
794 tracing::info!(shard_id = new_id, "Added new shard");
795 Ok(new_id)
796 }
797
798 /// Internal: Remove a stopped shard.
799 ///
800 /// Previously this dropped the worker `JoinHandle`s and unmapped
801 /// the shard without first draining its ring buffer. Any events
802 /// still queued at the moment of removal — even just a few from a
803 /// producer that pushed concurrently with the scale-down decision
804 /// — were silently stranded once the drain worker exited on
805 /// `with_shard → None`.
806 ///
807 /// The fix:
808 /// 1. Wait for the drain worker we're about to retire to flush
809 /// the channel, by closing the bus-side sender first.
810 /// 2. Call `remove_shard`, which atomically pops the
811 /// ring-buffer remnants and unmaps the shard. The drained
812 /// events come back to us as a `Vec`.
813 /// 3. Hand those events directly to the adapter as a
814 /// single-shot batch — bypassing the per-shard pipeline
815 /// that's already being torn down — so they reach durable
816 /// storage.
817 async fn remove_shard_internal(&self, shard_id: u16) -> Result<(), AdapterError> {
818 // Step 1: drop the bus-side sender. The drain worker still
819 // has its own clone and will keep draining; we want it to
820 // exit when its `with_shard` call returns `None` after
821 // step 2's unmap.
822 self.batch_senders.write().remove(&shard_id);
823
824 // Step 2: atomically drain whatever's in the ring buffer and
825 // unmap. After this, `with_shard(shard_id)` returns `None`
826 // and the drain worker exits at its next poll.
827 let stranded = self
828 .shard_manager
829 .remove_shard(shard_id)
830 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
831
832 // Step 3: take ownership of the worker handles (move them
833 // OUT of the mutex map so we can `await` them — `await`
834 // consumes a `JoinHandle`). With the bus-side sender already
835 // dropped (step 1) and the shard unmapped (step 2), the
836 // drain worker exits at its next poll and drops its sender
837 // clone, which closes the BatchWorker's `rx`. The
838 // BatchWorker then flushes any pending `current_batch` and
839 // any events still buffered in the channel, dispatches them
840 // via the standard `dispatch_batch` path with their PROPER
841 // `next_sequence` values, and exits.
842 //
843 // Await order: drain first, then batch. The drain worker's
844 // `Some(N>0)` arm `mem::replace`s scratch into a batch and
845 // `sender.send(batch).await`s it; that send must complete
846 // (or fail) before drain breaks. The batch worker's
847 // `Ok(None)` arm runs after the sender drops and flushes
848 // any pending batch. Awaiting in the reverse order would
849 // park here forever: the batch worker's `recv()` only
850 // returns `None` once every sender clone (including the
851 // drain worker's) has dropped.
852 //
853 // Both awaits are bounded by `2 × adapter_timeout` so a
854 // worker parked inside a slow adapter call cannot pin
855 // teardown indefinitely. A timeout leaks the JoinHandle —
856 // acceptable because step 1 already removed the bus-side
857 // sender and step 2 unmapped the shard, so the detached
858 // task can't observe new work and will exit on its next
859 // loop iteration.
860 let workers = self.batch_workers.lock().remove(&shard_id);
861 let final_next_sequence = if let Some(workers) = workers {
862 let bound = self.config.adapter_timeout.saturating_mul(2);
863 match tokio::time::timeout(bound, workers.drain).await {
864 Ok(Ok(())) => {}
865 Ok(Err(e)) => {
866 tracing::warn!(
867 shard_id,
868 error = %e,
869 "drain worker JoinHandle errored on await; \
870 drain worker should have already exited via \
871 `with_shard -> None`",
872 );
873 }
874 Err(_) => {
875 tracing::warn!(
876 shard_id,
877 timeout_ms = bound.as_millis() as u64,
878 "drain worker did not exit within timeout on remove_shard; detaching",
879 );
880 }
881 }
882 match tokio::time::timeout(bound, workers.batch).await {
883 Ok(Ok(())) => {}
884 Ok(Err(e)) => {
885 tracing::warn!(
886 shard_id,
887 error = %e,
888 "BatchWorker JoinHandle errored on await; \
889 proceeding with stranded-flush using last \
890 published next_sequence",
891 );
892 }
893 Err(_) => {
894 tracing::warn!(
895 shard_id,
896 timeout_ms = bound.as_millis() as u64,
897 "BatchWorker did not exit within timeout on remove_shard; detaching",
898 );
899 }
900 }
901 workers.next_sequence.load(AtomicOrdering::Acquire)
902 } else {
903 // No worker registered for this shard — shouldn't
904 // happen on the normal scale-down path, but defensively
905 // fall back to 0. This branch only activates if a
906 // caller manages to call `remove_shard_internal` for a
907 // shard that was never spawned (or already removed).
908 0
909 };
910
911 // Step 4: flush the stranded ring-buffer events through the
912 // adapter in one shot, using `final_next_sequence` (NOT 0)
913 // as the `sequence_start`. The stranded batch's msg-ids
914 // are `{nonce}:{shard_id}:{final_next_sequence}:{i}` —
915 // strictly past every msg-id the worker emitted. Using 0
916 // would collide with the worker's very first batch
917 // (`{nonce}:{shard_id}:0:{i}`), and JetStream's 2 min dedup
918 // window would silently drop the duplicate.
919 if !stranded.is_empty() {
920 let count = stranded.len();
921 // Use the bus's loaded producer nonce so the stranded
922 // batch's msg-ids share the same producer-identity
923 // segment as everything else this bus has emitted —
924 // critical for cross-process dedup.
925 let batch = crate::event::Batch::with_nonce(
926 shard_id,
927 stranded,
928 final_next_sequence,
929 self.producer_nonce,
930 );
931 let dispatched = dispatch_batch(
932 &*self.adapter,
933 Arc::new(batch),
934 shard_id,
935 self.config.adapter_timeout,
936 self.config.adapter_batch_retries,
937 )
938 .await;
939 if dispatched {
940 self.stats
941 .batches_dispatched
942 .fetch_add(1, AtomicOrdering::Relaxed);
943 self.stats
944 .events_dispatched
945 .fetch_add(count as u64, AtomicOrdering::Relaxed);
946 tracing::info!(
947 shard_id,
948 count,
949 sequence_start = final_next_sequence,
950 "Removed shard: flushed stranded ring-buffer events to adapter"
951 );
952 } else {
953 tracing::error!(
954 shard_id,
955 count,
956 sequence_start = final_next_sequence,
957 "Removed shard: adapter rejected stranded ring-buffer events; \
958 events lost"
959 );
960 }
961 }
962
963 // Update poll merger with the post-remove id set.
964 // Without this, a default-shards poll (`request.shards == None`)
965 // would still iterate `0..num_shards` and skip the live shard
966 // whose id is now the largest, while polling a nonexistent /
967 // recreated shard at the bottom of the range.
968 //
969 // `poll_merger_swap_lock` serializes against
970 // `add_shard_internal`'s matching block — see the field doc.
971 {
972 let _swap_guard = self.poll_merger_swap_lock.lock();
973 self.poll_merger.store(Arc::new(PollMerger::new(
974 self.adapter.clone(),
975 self.shard_manager.shard_ids(),
976 )));
977 }
978
979 tracing::info!(shard_id = shard_id, "Removed shard");
980 Ok(())
981 }
982
983 /// Try to enter an ingest critical section. Returns `None` if
984 /// shutdown is in progress, in which case the caller must
985 /// return `IngestionError::ShuttingDown` without touching the
986 /// shard manager.
987 ///
988 /// The `fetch_add` + load(`shutdown`) sequence pairs with
989 /// `shutdown()`'s store(`shutdown=true`) + wait-for-zero on
990 /// `in_flight_ingests`. SeqCst on both sides closes the
991 /// stranding race: every ingest that is observed as in-flight
992 /// during shutdown's wait is guaranteed to complete before the
993 /// drain workers do their final ring-buffer sweep, so no event
994 /// can land in a ring buffer after the drain worker has stopped
995 /// reading from it.
996 #[inline(always)]
997 fn try_enter_ingest(&self) -> Option<IngestGuard<'_>> {
998 let slot = self.in_flight_ingests.slot_for_current_thread();
999 self.in_flight_ingests.fetch_add_at(slot, 1);
1000 if self.shutdown.load(AtomicOrdering::SeqCst) {
1001 self.in_flight_ingests.fetch_sub_at(slot, 1);
1002 return None;
1003 }
1004 Some(IngestGuard { bus: self, slot })
1005 }
1006
1007 /// Ingest an event.
1008 ///
1009 /// This is a non-blocking operation. The event is added to the appropriate
1010 /// shard's ring buffer and will be batched and persisted asynchronously.
1011 ///
1012 /// # Returns
1013 ///
1014 /// The shard ID and insertion timestamp on success.
1015 #[inline]
1016 pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)> {
1017 let _g = self
1018 .try_enter_ingest()
1019 .ok_or(IngestionError::ShuttingDown)?;
1020
1021 match self.shard_manager.ingest(event.into_inner()) {
1022 Ok((shard_id, ts)) => {
1023 self.stats
1024 .events_ingested
1025 .fetch_add(1, AtomicOrdering::Relaxed);
1026 Ok((shard_id, ts))
1027 }
1028 Err(e) => {
1029 self.stats
1030 .events_dropped
1031 .fetch_add(1, AtomicOrdering::Relaxed);
1032 Err(e)
1033 }
1034 }
1035 }
1036
1037 /// Ingest a raw event (pre-serialized with cached hash).
1038 ///
1039 /// This is the fastest ingestion path:
1040 /// - Uses pre-computed hash for shard selection (no serialization)
1041 /// - Stores bytes directly (no clone needed, reference-counted)
1042 ///
1043 /// # Returns
1044 ///
1045 /// The shard ID and insertion timestamp on success.
1046 #[inline]
1047 pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)> {
1048 let _g = self
1049 .try_enter_ingest()
1050 .ok_or(IngestionError::ShuttingDown)?;
1051
1052 match self.shard_manager.ingest_raw(event) {
1053 Ok((shard_id, ts)) => {
1054 self.stats
1055 .events_ingested
1056 .fetch_add(1, AtomicOrdering::Relaxed);
1057 Ok((shard_id, ts))
1058 }
1059 Err(e) => {
1060 self.stats
1061 .events_dropped
1062 .fetch_add(1, AtomicOrdering::Relaxed);
1063 Err(e)
1064 }
1065 }
1066 }
1067
1068 /// Ingest a batch of events.
1069 ///
1070 /// This is more efficient than calling `ingest` repeatedly: events
1071 /// destined for the same shard share a single mutex acquisition.
1072 ///
1073 /// # Returns
1074 ///
1075 /// The number of successfully ingested events.
1076 pub fn ingest_batch(&self, events: Vec<Event>) -> usize {
1077 // The shutdown gate lives in `ingest_raw_batch`, which we
1078 // forward to. No separate guard here — that would double-
1079 // count `in_flight_ingests` (once for this call, once for
1080 // the inner call) and could deadlock shutdown under high
1081 // contention.
1082 let raw: Vec<RawEvent> = events.into_iter().map(|e| e.into_raw()).collect();
1083 self.ingest_raw_batch(raw)
1084 }
1085
1086 /// Ingest a batch of raw events (fastest batch ingestion).
1087 ///
1088 /// Groups events by their destination shard and pushes each group
1089 /// under a single mutex acquisition.
1090 ///
1091 /// # Returns
1092 ///
1093 /// The number of successfully ingested events.
1094 pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize {
1095 let _g = match self.try_enter_ingest() {
1096 Some(g) => g,
1097 None => return 0,
1098 };
1099
1100 let total = events.len();
1101 let (success, unrouted) = self.shard_manager.ingest_raw_batch(events);
1102 if success > 0 {
1103 self.stats
1104 .events_ingested
1105 .fetch_add(success as u64, AtomicOrdering::Relaxed);
1106 }
1107 // Subtract `unrouted` from the buffer-fullness drop count
1108 // so the same event isn't tallied in both `events_unrouted`
1109 // (bumped inside `ShardManager::ingest_raw_batch`) and
1110 // `events_dropped` — using a plain `total - success` here
1111 // would double-count unrouted events. Backpressure drops
1112 // = events that reached a shard but failed to push.
1113 let dropped = total.saturating_sub(success).saturating_sub(unrouted);
1114 if dropped > 0 {
1115 self.stats
1116 .events_dropped
1117 .fetch_add(dropped as u64, AtomicOrdering::Relaxed);
1118 }
1119 success
1120 }
1121
1122 /// Poll events from the bus.
1123 ///
1124 /// This retrieves events from storage according to the request parameters.
1125 ///
1126 /// # Topology-change visibility
1127 ///
1128 /// `ArcSwap::load()` snapshots the current `PollMerger` for
1129 /// the duration of this call. A concurrent `add_shard` /
1130 /// `remove_shard_internal` that `.store()`s a fresh merger
1131 /// only affects **subsequent** polls — this poll continues
1132 /// against the loaded snapshot's shard list.
1133 ///
1134 /// The implications:
1135 /// - **add_shard mid-poll:** events ingested into the new
1136 /// shard between the merger swap and our return are
1137 /// invisible to this call. They appear on the next poll.
1138 /// - **remove_shard_internal mid-poll:** the stale merger
1139 /// still has the removed shard in its id list. Adapters
1140 /// that lazy-create streams on `poll_shard` (JetStream
1141 /// in particular) may recreate the stream and return
1142 /// empty/stale data. The drained events are dispatched
1143 /// to durable storage by `remove_shard_internal` itself
1144 /// before this poll's adapter call lands; the next poll
1145 /// loads the new merger and sees the correct shard set.
1146 ///
1147 /// In both cases the loss is transient and self-healing:
1148 /// pagination via `next_id` and the next poll's cursor pick
1149 /// up where we left off. Callers requiring strict
1150 /// "topology-stable" semantics should serialize their polls
1151 /// against scaling operations externally.
1152 pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
1153 let merger = self.poll_merger.load();
1154 merger.poll(request).await
1155 }
1156
1157 /// Get the number of shards.
1158 pub fn num_shards(&self) -> u16 {
1159 self.shard_manager.num_shards()
1160 }
1161
1162 /// Get the adapter name.
1163 pub fn adapter_name(&self) -> &'static str {
1164 self.adapter.name()
1165 }
1166
1167 /// Check if the adapter is healthy.
1168 pub async fn is_healthy(&self) -> bool {
1169 self.adapter.is_healthy().await
1170 }
1171
1172 /// Get statistics.
1173 pub fn stats(&self) -> &EventBusStats {
1174 &self.stats
1175 }
1176
1177 /// Get shard statistics.
1178 pub fn shard_stats(&self) -> crate::shard::ShardStats {
1179 self.shard_manager.stats()
1180 }
1181
1182 /// Sum of `len()` across every shard's ring buffer.
1183 ///
1184 /// Mainly useful in tests and operational diagnostics: a
1185 /// non-zero value at the time of `Drop` (without an awaited
1186 /// `shutdown()`) would be silently lost, so `Drop` folds this
1187 /// into `events_dropped` before the bus disappears.
1188 pub fn pending_in_rings(&self) -> u64 {
1189 self.shard_manager.total_pending_in_rings()
1190 }
1191
1192 /// Flush all pending batches.
1193 ///
1194 /// Waits for all shard ring buffers to drain, then for the
1195 /// per-shard mpsc channels to drain, then for any pending batch
1196 /// inside each batch worker to time out and dispatch — and only
1197 /// then calls `adapter.flush()`.
1198 ///
1199 /// # Latency bound
1200 ///
1201 /// The total wall-clock budget is the sum of three phases:
1202 /// * Phase 1 (ring-buffer drain): up to **5 s**.
1203 /// * Phase 2 (channel + pending-batch drain): up to
1204 /// `min(2 s, batch.max_delay × n_workers)` — capped at 2 s
1205 /// so a misconfigured `max_delay` cannot inflate the budget.
1206 /// * Phase 3 (`adapter.flush()` call): up to `adapter_timeout`
1207 /// (default **30 s**).
1208 ///
1209 /// Worst-case `flush()` runtime is therefore **~37 s under
1210 /// default config**, NOT 5 s as an earlier doc-comment stated.
1211 /// Callers wiring `flush()` into request-path latencies (HTTP
1212 /// handler, RPC) MUST set `adapter_timeout` accordingly or run
1213 /// the flush under their own outer timeout. The 5-second figure
1214 /// describes Phase 1 only; the doc was misleading and is fixed
1215 /// here.
1216 ///
1217 /// The previous implementation slept a single `batch.max_delay`
1218 /// (default 10 ms) after the ring buffers drained and immediately
1219 /// called `adapter.flush()`. Events still in transit through the
1220 /// per-shard mpsc channel, the batch worker's pending batch, or
1221 /// the in-progress `adapter.on_batch` call (bounded only by
1222 /// `adapter_timeout`, default 30 s) could miss the flush. Callers
1223 /// using `flush()` as a delivery barrier silently lost events.
1224 pub async fn flush(&self) -> Result<(), AdapterError> {
1225 let start = tokio::time::Instant::now();
1226 let timeout = Duration::from_secs(5);
1227 let mut backoff = Duration::from_micros(100);
1228
1229 // Phase 1: wait for ring buffers to drain (drain workers
1230 // pump them into the per-shard mpsc channels).
1231 loop {
1232 if self.shard_manager.all_shards_empty() {
1233 break;
1234 }
1235 if start.elapsed() >= timeout {
1236 tracing::warn!("flush: ring buffers not fully drained after {:?}", timeout);
1237 break;
1238 }
1239 tokio::time::sleep(backoff).await;
1240 backoff = (backoff * 2).min(Duration::from_millis(10));
1241 }
1242
1243 // Phase 2: wait until every event ingested before flush()
1244 // started has been handed to the adapter via `on_batch`.
1245 // Snapshot the `events_ingested` counter at flush entry —
1246 // that's our target. We then poll `events_dispatched` (sum
1247 // of batch lengths from successful adapter dispatches) plus
1248 // `events_dropped` (events the adapter rejected after retry
1249 // exhaustion or that never made it past backpressure). The
1250 // barrier is met when `events_dispatched + events_dropped >=
1251 // target`: every pre-flush ingest is accounted for in one
1252 // bucket or the other.
1253 //
1254 // This is a true delivery barrier — it doesn't rely on
1255 // "no progress this window" heuristics that race a
1256 // BatchWorker whose `batch_start` was set just before
1257 // flush() ran. A progress gate that reads
1258 // `batches_dispatched` only works if that counter is
1259 // actually incremented on every dispatch, and Windows
1260 // timer resolution alone has historically made any
1261 // single-`max_delay`-sleep approach a frequent flake.
1262 let target_ingested = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1263 let dropped_at_start = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1264 let dispatched_at_start = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1265
1266 // Outer deadline still bounds Phase 2 in case a wedged
1267 // adapter never returns. `max_delay * num_workers` is the
1268 // worst-case shape (one partially-filled batch per worker,
1269 // each waiting its full `max_delay` to time out), capped at
1270 // 2 s — same upper bound as before.
1271 //
1272 // Read the worker count via the shard manager's atomic-
1273 // backed `num_shards()` rather than `batch_workers.lock()
1274 // .len()`. The previous spinlock-backed `.lock()` inside
1275 // an `async fn` could stall the runtime worker thread
1276 // under contention with concurrent `add_shard_internal` /
1277 // `remove_shard_internal` callers; the atomic accessor is
1278 // both faster and async-safe. Mismatch in the
1279 // worker-count vs shard-count snapshot only changes the
1280 // phase2 deadline by at most one `max_delay` step, which
1281 // is bounded by the outer 2s cap regardless.
1282 let n_workers = usize::from(self.shard_manager.num_shards());
1283 let phase2_budget = self
1284 .config
1285 .batch
1286 .max_delay
1287 .saturating_mul(n_workers.max(1) as u32);
1288 let phase2_deadline =
1289 tokio::time::Instant::now() + phase2_budget.min(Duration::from_secs(2));
1290
1291 // Inner poll cadence: re-check the counters every 1 ms (or
1292 // `max_delay / 16`, whichever is larger). The fast cadence
1293 // means we exit promptly once the BatchWorker dispatches,
1294 // rather than waking exactly once per `max_delay` and
1295 // potentially racing the dispatch by a few ms.
1296 let poll_interval = (self.config.batch.max_delay / 16).max(Duration::from_millis(1));
1297 loop {
1298 let dispatched = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1299 let dropped = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1300 // The barrier: every event ingested pre-flush has been
1301 // either dispatched or dropped. The bus's invariant
1302 // `events_ingested = events_dispatched + events_dropped`
1303 // holds at quiescence; we wait until
1304 // `dispatched + dropped >= target_ingested`.
1305 //
1306 // Pre-fix this used `dispatched + (dropped -
1307 // dropped_at_start) >= target_ingested`, which under-
1308 // counted by `dropped_at_start`: even after every
1309 // pre-flush event was processed, the inequality
1310 // required `dropped_at_start` MORE post-flush events
1311 // before signalling done. Workloads with no post-flush
1312 // ingest hung at the barrier until the deadline fired.
1313 //
1314 // Cross-shard race remains: a fast shard's post-flush
1315 // dispatches can satisfy the global target while a
1316 // slow shard's pre-flush events linger in its mpsc
1317 // channel or pending batch. `all_shards_empty()`
1318 // checks ring buffers but not those downstream
1319 // queues. Operators relying on flush as a hard
1320 // delivery barrier should call it during quiet
1321 // ingest, or in `shutdown` (which gates ingest via
1322 // `try_enter_ingest`).
1323 let _ = dispatched_at_start; // reserved for future per-shard accounting
1324 if dispatched.saturating_add(dropped) >= target_ingested
1325 && self.shard_manager.all_shards_empty()
1326 {
1327 break;
1328 }
1329 if tokio::time::Instant::now() >= phase2_deadline {
1330 tracing::warn!(
1331 target = target_ingested,
1332 dispatched,
1333 dropped = dropped.saturating_sub(dropped_at_start),
1334 "flush: Phase 2 deadline reached before all ingested events were dispatched",
1335 );
1336 break;
1337 }
1338 tokio::time::sleep(poll_interval).await;
1339 }
1340
1341 // Phase 3: tell the adapter to flush whatever it has
1342 // buffered. Bounded by `adapter_timeout` so a hanging
1343 // adapter can't pin us forever.
1344 match tokio::time::timeout(self.config.adapter_timeout, self.adapter.flush()).await {
1345 Ok(r) => r,
1346 Err(_) => {
1347 tracing::warn!(
1348 "flush: adapter.flush timed out after {:?}",
1349 self.config.adapter_timeout
1350 );
1351 Err(AdapterError::Fatal("adapter flush timed out".into()))
1352 }
1353 }
1354 }
1355
1356 /// Gracefully shut down the event bus.
1357 ///
1358 /// The shutdown order is load-bearing:
1359 ///
1360 /// 1. Signal `shutdown` so drain workers stop pulling from
1361 /// ring buffers after their final sweep.
1362 /// 2. Await **drain workers** so every event the producer
1363 /// has handed to the bus is now in the per-shard mpsc
1364 /// channel.
1365 /// 3. Drop `batch_senders` so each channel's last sender is
1366 /// gone — the next `recv().await` in a batch worker will
1367 /// return `None`.
1368 /// 4. Await **batch workers**, which drain everything
1369 /// remaining in their channel and exit on `recv() = None`.
1370 ///
1371 /// Reversing steps 2 and 4 (the previous design) silently
1372 /// dropped events: a batch worker that exited on the shutdown
1373 /// flag could leave events the drain worker pushed *after* its
1374 /// `try_recv` sweep stranded in the channel.
1375 pub async fn shutdown(self) -> Result<(), AdapterError> {
1376 self.shutdown_via_ref().await
1377 }
1378
1379 /// Shutdown via shared reference — same semantics as
1380 /// [`shutdown`](Self::shutdown), but does not consume `self`.
1381 ///
1382 /// Useful for callers that hold the bus behind `Arc<EventBus>`
1383 /// (e.g., the SDK, where `subscribe` perpetuates an Arc clone
1384 /// into every `EventStream`) and therefore cannot satisfy
1385 /// `Arc::try_unwrap`. Idempotent: the first caller does the
1386 /// work; concurrent or subsequent callers wait for the
1387 /// `shutdown_completed` flag and return `Ok(())`.
1388 pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError> {
1389 // 1. CAS the shutdown flag false→true. SeqCst pairs with
1390 // `try_enter_ingest`'s shutdown check — any producer that
1391 // observed the previous `false` and is mid-push has its
1392 // `in_flight_ingests` increment ordered before this store
1393 // (the CAS-success branch is a release of the new `true`),
1394 // and so will be visible to the wait below.
1395 //
1396 // If the CAS loses (someone else — typically a concurrent
1397 // call or `Drop` — already flipped the flag), spin until
1398 // they finish. We can't run the rest of the body because
1399 // workers/senders may already be partially torn down.
1400 if self
1401 .shutdown
1402 .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
1403 .is_err()
1404 {
1405 // Bound the wait so a `Drop`-only path (which sets
1406 // `shutdown=true` but never sets `shutdown_completed`)
1407 // doesn't spin forever.
1408 //
1409 // Distinguish the two outcomes for callers. If
1410 // `shutdown_completed` flips inside the window, we
1411 // return `Ok(())` and the caller can be sure the first
1412 // caller finished. If the deadline fires first, we
1413 // surface `AdapterError::Transient(_)` — the bus IS
1414 // being shut down (the flag is set), but completion is
1415 // not yet observable; the caller can treat this as
1416 // "another thread is working on it, retry the
1417 // is_shutdown_completed() poll if you need a hard
1418 // barrier."
1419 //
1420 // Returning `Ok(())` in both branches would let
1421 // shutdown-done assumptions silently drift under a slow
1422 // adapter (`adapter_timeout` default 30 s > the 10 s
1423 // spin deadline), letting subsequent code observe a
1424 // partially-shut-down bus.
1425 // 10s in production builds; overridable via the
1426 // `_TEST_OVERRIDE_SHUTDOWN_VIA_REF_DEADLINE` thread-local
1427 // in test builds so the slow-first-caller test doesn't
1428 // need to wall-clock-wait for the full deadline. The
1429 // override is `#[cfg(test)]`-only; production cargo
1430 // builds compile out the override entirely.
1431 let deadline_dur = shutdown_via_ref_spin_deadline();
1432 // Use `tokio::time::Instant` so tests using
1433 // `tokio::time::pause()` virtualize this clock too.
1434 // Pre-fix `std::time::Instant` was wall-clock and
1435 // ignored `pause()`, breaking timeout-bounded tests
1436 // that wanted to fast-forward the spin deadline.
1437 let deadline = tokio::time::Instant::now() + deadline_dur;
1438 while !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1439 if tokio::time::Instant::now() >= deadline {
1440 return Err(AdapterError::Transient(
1441 "shutdown_via_ref: another caller is mid-shutdown; \
1442 deadline elapsed before shutdown_completed \
1443 flipped. The bus IS shutting down; poll \
1444 is_shutdown_completed() if you need a hard \
1445 barrier."
1446 .into(),
1447 ));
1448 }
1449 // `yield_now` re-queues immediately and keeps the
1450 // task hot, starving the workers we're waiting on
1451 // under contention. A short `sleep` parks the task
1452 // and lets the runtime schedule the workers.
1453 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1454 }
1455 return Ok(());
1456 }
1457
1458 // 1a. Wait for in-flight ingests to drain BEFORE the drain
1459 // workers do their final ring-buffer sweep. Otherwise a
1460 // producer that observed `shutdown=false` could push *after*
1461 // the drain worker's last `pop_batch_into` returned zero,
1462 // leaving the event stranded in the ring buffer when the bus
1463 // is dropped.
1464 //
1465 // This is bounded: every producer either bails on the
1466 // SeqCst-synchronized shutdown check (no progress past the
1467 // increment) or completes its single non-blocking push and
1468 // decrements. Both paths take constant time; the total
1469 // wait is O(producer threads).
1470 //
1471 // The "every observed in-flight ingest completes before
1472 // the final sweep" property holds under normal conditions,
1473 // but the 5-second deadline below forces the gate open
1474 // even when producers are still in their push window. A
1475 // producer that has incremented `in_flight_ingests` (and
1476 // so observed `shutdown=false`) but whose push is delayed
1477 // past the deadline (heavy contention, debugger hit, etc.)
1478 // will complete its push AFTER the final sweep — its event
1479 // lands in the ring buffer and is never read. The deadline
1480 // exists so a stuck producer can't deadlock shutdown
1481 // indefinitely; the trade-off is documented data loss past
1482 // the 5 s window, surfaced via the `events_dropped` stat
1483 // (so the loss is observable to operators) and the `WARN`
1484 // log below (so it's diagnosable). The "no stranding"
1485 // promise on the happy path stands; the deadline path is
1486 // the documented escape hatch.
1487 // Use `tokio::time::Instant` so tests using
1488 // `tokio::time::pause()` virtualize this 5-second
1489 // deadline too — pre-fix the `std::time::Instant`
1490 // was wall-clock and ignored the test's paused clock.
1491 let in_flight_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1492 // Snapshot of `(stranded, ingested, dispatched)` at the
1493 // deadline — Some(...) iff we hit the deadline path. The
1494 // post-drain reconciliation reads this to compute the
1495 // actual drop count (see comment further down).
1496 let mut deadline_snapshot: Option<(u64, u64, u64)> = None;
1497 while self.in_flight_ingests.sum() > 0 {
1498 if tokio::time::Instant::now() >= in_flight_deadline {
1499 let stranded = self.in_flight_ingests.sum();
1500 let ingested_now = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1501 let dispatched_now = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1502 tracing::warn!(
1503 in_flight = stranded,
1504 lossy = true,
1505 "shutdown timed out waiting for in-flight ingests after 5s; \
1506 proceeding — up to {} events may strand in the ring buffer \
1507 past final drain (documented data-loss path)",
1508 stranded,
1509 );
1510 // Set the lossy flag immediately so a fast `is_*`
1511 // poll observes the outcome before the drain
1512 // finishes. The actual `events_dropped` bump is
1513 // deferred until after the final drain runs (see
1514 // "post-drain reconciliation" below) so we don't
1515 // double-count events that the drain still
1516 // successfully delivers — pre-fix this bumped
1517 // `events_dropped += stranded` here and the same
1518 // events that the final sweep then drained landed
1519 // in BOTH `events_ingested` and `events_dropped`,
1520 // breaking the bus's
1521 // `ingested == dispatched + dropped` invariant
1522 // and turning `shutdown_was_lossy` into a false
1523 // positive on every deadline-triggered shutdown.
1524 self.stats
1525 .shutdown_was_lossy
1526 .store(true, AtomicOrdering::Release);
1527 deadline_snapshot = Some((stranded, ingested_now, dispatched_now));
1528 break;
1529 }
1530 // Park instead of `yield_now`. The producers we're
1531 // waiting on contend for the same runtime threads;
1532 // re-queuing immediately starves their progress.
1533 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1534 }
1535
1536 // 1b. Release the drain-finalize gate.
1537 //
1538 // Pre-fix used `Ordering::Release` for this store
1539 // and relied on the SeqCst spin above (loading
1540 // `in_flight_ingests`) to provide the happens-before for
1541 // every observed-pre-shutdown push. That works today
1542 // because SeqCst loads carry an implicit fence, but a
1543 // future change to the spin's ordering (Relaxed for perf,
1544 // say) would silently break the drain worker's final-sweep
1545 // contract — producer pushes might not be visible to
1546 // `pop_batch_into`. Promote to SeqCst so the load-bearing
1547 // happens-before is explicit at this site, not derived
1548 // from another atomic's ordering choice.
1549 //
1550 // Caveat: the SeqCst happens-before above only covers the
1551 // *non-deadline* exit from the spin (the loop condition
1552 // observed `in_flight_ingests == 0`). On the deadline-break
1553 // path the loop exits with `in_flight_ingests > 0` — the
1554 // outstanding producer pushes have NOT been observed
1555 // synchronized-with this thread, and the SeqCst store below
1556 // does not retroactively create a happens-before with
1557 // pushes that are still mid-flight. Those events are
1558 // exactly the "stranded" events accounted via
1559 // `events_dropped` and `shutdown_was_lossy`; the contract
1560 // is "every observed-pre-shutdown push is visible to the
1561 // final sweep on the happy path; on the deadline path,
1562 // up to `stranded` events past the gate are surfaced as
1563 // dropped". The flag at line 1271 (`shutdown_was_lossy`)
1564 // is the operator-visible signal that this contract was
1565 // exercised on the lossy branch.
1566 self.drain_finalize_ready
1567 .store(true, AtomicOrdering::SeqCst);
1568
1569 // Stop the scaling monitor first — it's independent of the
1570 // ingestion path and just needs to observe the flag.
1571 let scaling_handle = self.scaling_monitor.lock().take();
1572 if let Some(handle) = scaling_handle {
1573 let _ = handle.await;
1574 }
1575
1576 // Take workers without holding the lock across await.
1577 let workers = std::mem::take(&mut *self.batch_workers.lock());
1578
1579 // 2. Await drain workers. Each one pops a final batch (up
1580 // to 10k events) from its ring buffer, sends it on the
1581 // channel, and exits. After this loop, every event in
1582 // the ring buffers has been pushed to its channel.
1583 //
1584 // `join_all` lets the runtime overlap drain handles. A
1585 // sequential `for ... { drain.await; }` would serialize
1586 // shutdown wall-clock as N×T instead of max(T), which on
1587 // the default 1024-shard config × per-shard final-drain
1588 // time makes shutdown painful.
1589 let (drains, batch_handles): (Vec<_>, Vec<_>) = workers
1590 .into_iter()
1591 .map(|(shard_id, ShardWorkers { batch, drain, .. })| {
1592 ((shard_id, drain), (shard_id, batch))
1593 })
1594 .unzip();
1595
1596 // Surface drain-worker JoinErrors explicitly. The default
1597 // Tokio runtime does NOT log spawned-task panics, so a
1598 // `let _ = join_all(...)` would silently swallow a panic
1599 // and mask stranded events. tracing::error per failure
1600 // makes the incident grep-able post-mortem.
1601 let drain_handles: Vec<_> = drains.into_iter().map(|(_, h)| h).collect();
1602 let drain_ids: Vec<u16> = batch_handles.iter().map(|(id, _)| *id).collect();
1603 for (shard_id, result) in drain_ids
1604 .iter()
1605 .copied()
1606 .zip(futures::future::join_all(drain_handles).await)
1607 {
1608 if let Err(e) = result {
1609 tracing::error!(
1610 shard_id,
1611 error = %e,
1612 "drain worker JoinHandle errored on shutdown await"
1613 );
1614 }
1615 }
1616
1617 // 3. Drop the original senders so the channels close once
1618 // drain-worker sender clones (already dropped above)
1619 // are gone too. Without this, batch workers would block
1620 // on `recv().await` forever.
1621 drop(std::mem::take(&mut *self.batch_senders.write()));
1622
1623 // 4. Await batch workers. They drain their channel until
1624 // `recv() = None`, flush, and exit.
1625 //
1626 // Same parallelization as the drain phase, with the same
1627 // explicit JoinError surfacing.
1628 let batch_only: Vec<_> = batch_handles.into_iter().map(|(_, h)| h).collect();
1629 for (shard_id, result) in drain_ids
1630 .into_iter()
1631 .zip(futures::future::join_all(batch_only).await)
1632 {
1633 if let Err(e) = result {
1634 tracing::error!(
1635 shard_id,
1636 error = %e,
1637 "BatchWorker JoinHandle errored on shutdown await"
1638 );
1639 }
1640 }
1641
1642 // Flush and shutdown adapter (with timeout to prevent hanging)
1643 let timeout = self.config.adapter_timeout;
1644 if tokio::time::timeout(timeout, self.adapter.flush())
1645 .await
1646 .is_err()
1647 {
1648 tracing::error!("Adapter flush timed out during shutdown");
1649 }
1650 let result = tokio::time::timeout(timeout, self.adapter.shutdown())
1651 .await
1652 .map_err(|_| AdapterError::Fatal("adapter shutdown timed out".into()))?;
1653
1654 // Post-drain reconciliation for the lossy-shutdown path.
1655 //
1656 // If we hit the in-flight deadline above, `deadline_snapshot`
1657 // holds `(stranded, ingested@deadline, dispatched@deadline)`.
1658 // Some of those `stranded` producers' events landed in the
1659 // ring AFTER our deadline check but BEFORE the
1660 // `drain_finalize_ready` gate flipped — those events are
1661 // now successfully ingested, drained, and dispatched through
1662 // the adapter. They appear in `events_dispatched` (the
1663 // delta since the deadline), so:
1664 //
1665 // actual_drops = stranded
1666 // - (dispatched_after_drain - dispatched@deadline)
1667 // - (ingested_after_drain - ingested@deadline)
1668 // only counting events that landed but
1669 // weren't dispatched (dropped under
1670 // backpressure, etc.)
1671 //
1672 // The cleaner reconciliation: events that completed
1673 // `try_enter_ingest` AFTER the deadline either completed
1674 // ingest (bumping `events_ingested`) or were dropped on
1675 // backpressure (bumping `events_dropped` from the existing
1676 // backpressure paths). The `stranded - delta_ingested`
1677 // remainder is producers whose `try_enter_ingest` succeeded
1678 // but never reached `shard_manager.ingest()` — those are
1679 // the genuinely-lost events we should account for.
1680 if let Some((stranded, ingested_at_deadline, _dispatched_at_deadline)) = deadline_snapshot {
1681 let ingested_after = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1682 let post_deadline_ingests = ingested_after.saturating_sub(ingested_at_deadline);
1683 // Known under-count window: a producer that completed
1684 // `shard_manager.ingest()` and bumped `events_ingested`
1685 // just BEFORE its `IngestGuard` decremented
1686 // `in_flight_ingests` will be counted in
1687 // `ingested_at_deadline` (already past the bump) AND in
1688 // `stranded` (still pending the decrement on that same
1689 // spin). That single event contributes `+1` to both
1690 // sides of the subtraction, so the saturating_sub
1691 // under-counts the drop by 1 per such interleaving.
1692 // The opposite direction (producer in-flight at
1693 // deadline that never completed ingest) is correctly
1694 // counted as a drop, so the bias is one-sided and
1695 // small (bounded by the number of producers mid-push
1696 // at the exact deadline moment — typically 0–1 even
1697 // under heavy load). Operators reading
1698 // `shutdown_was_lossy` get the right boolean; the
1699 // numeric `events_dropped` may under-count by a few
1700 // events on a lossy shutdown. Acceptable; the cost of
1701 // closing the window is paired-SeqCst reloads under
1702 // the spin which would dominate the shutdown path.
1703 let actual_drops = stranded.saturating_sub(post_deadline_ingests);
1704 if actual_drops > 0 {
1705 self.stats
1706 .events_dropped
1707 .fetch_add(actual_drops, AtomicOrdering::Relaxed);
1708 } else {
1709 // The deadline tripped the eager
1710 // `shutdown_was_lossy = true` set above, but the
1711 // final drain ingested every stranded event so
1712 // nothing was actually lost. Clear the flag so
1713 // operator dashboards alerting on
1714 // `was_lossy && events_dropped == 0` don't see a
1715 // false positive. Pre-fix the boolean stayed
1716 // `true` against `events_dropped == 0` for any
1717 // deadline-triggered shutdown whose drain
1718 // happened to catch up.
1719 self.stats
1720 .shutdown_was_lossy
1721 .store(false, AtomicOrdering::Release);
1722 }
1723 tracing::warn!(
1724 stranded_at_deadline = stranded,
1725 post_deadline_ingests,
1726 actual_drops,
1727 "lossy shutdown reconciled: post-drain `events_dropped` bumped \
1728 by stranded - post-deadline-ingests (pre-fix this bumped by \
1729 the full `stranded` count, double-counting events the drain \
1730 still successfully delivered)",
1731 );
1732 }
1733
1734 // Mark shutdown as completed so Drop knows not to warn.
1735 self.shutdown_completed.store(true, AtomicOrdering::Release);
1736 result
1737 }
1738
1739 /// True once `shutdown` / `shutdown_via_ref` has signaled — does
1740 /// not imply the shutdown work has finished. Use
1741 /// [`is_shutdown_completed`](Self::is_shutdown_completed) for
1742 /// completion.
1743 pub fn is_shutdown(&self) -> bool {
1744 self.shutdown.load(AtomicOrdering::Acquire)
1745 }
1746
1747 /// True once `shutdown` / `shutdown_via_ref` has fully drained
1748 /// workers and the adapter shutdown returned (success path only).
1749 pub fn is_shutdown_completed(&self) -> bool {
1750 self.shutdown_completed.load(AtomicOrdering::Acquire)
1751 }
1752
1753 /// Get shard metrics (if dynamic scaling is enabled).
1754 pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>> {
1755 self.shard_manager.collect_metrics()
1756 }
1757
1758 /// Check if dynamic scaling is enabled.
1759 pub fn is_dynamic_scaling_enabled(&self) -> bool {
1760 self.config.scaling.is_some()
1761 }
1762
1763 /// Manually trigger a scale-up (for testing or manual intervention).
1764 ///
1765 /// Bypasses the auto-scaling cooldown so a deliberate operator
1766 /// request isn't rate-limited by the auto-scaling cadence.
1767 /// Pre-fix this looped `add_shard_internal()` N times, each
1768 /// of which bumped `last_scaling`, so iteration 1+ failed
1769 /// with `InCooldown` against any non-zero cooldown — the
1770 /// first shard was left half-added (workers spawned, routing
1771 /// entry installed) while the error propagated to the
1772 /// caller. The `max_shards` budget check still applies.
1773 pub async fn manual_scale_up(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1774 let mut new_ids = Vec::with_capacity(count as usize);
1775 for _ in 0..count {
1776 let id = self.add_shard_internal_force().await?;
1777 new_ids.push(id);
1778 }
1779 Ok(new_ids)
1780 }
1781
1782 /// Manually trigger a scale-down (for testing or manual intervention).
1783 ///
1784 /// Marks `count` shards as `Draining`, waits for them to empty,
1785 /// finalizes them to `Stopped`, and removes them from the
1786 /// routing table — mirroring the scaling monitor's per-tick
1787 /// finalize loop. Returns the IDs of shards that were
1788 /// successfully drained AND removed (subset of those marked
1789 /// Draining if any failed to empty within the deadline).
1790 ///
1791 /// Drives the full scale-down lifecycle synchronously: a
1792 /// plain `mapper.scale_down` call marks shards `Draining` but
1793 /// does NOT finalize them — finalization is the scaling
1794 /// monitor's responsibility. Bus configs without an active
1795 /// monitor (or callers that shut down before the monitor's
1796 /// next tick) would otherwise lose any events queued in those
1797 /// shards' ring buffers.
1798 pub async fn manual_scale_down(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1799 let mapper = self
1800 .shard_manager
1801 .mapper()
1802 .ok_or_else(|| AdapterError::Fatal("Dynamic scaling not enabled".into()))?;
1803
1804 let drained_ids = mapper
1805 .scale_down(count)
1806 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
1807
1808 // `finalize_draining` requires the shard to have been
1809 // Draining for >100ms with an empty ring buffer and no
1810 // pushes since drain start. Poll until every requested
1811 // shard finalizes, capped by an outer deadline so a wedged
1812 // producer can't pin this method forever. Use
1813 // `tokio::time::Instant` so tests under `tokio::time::pause()`
1814 // advance the virtual clock via `sleep` rather than spinning
1815 // until wall-clock catches up.
1816 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1817 let mut finalized: std::collections::HashSet<u16> = std::collections::HashSet::new();
1818 let target: std::collections::HashSet<u16> = drained_ids.iter().copied().collect();
1819
1820 while finalized.len() < target.len() && tokio::time::Instant::now() < deadline {
1821 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1822 let stopped = mapper.finalize_draining();
1823 // `finalize_draining` is destructive — every qualifying
1824 // Draining shard transitions to Stopped in one shot,
1825 // regardless of who initiated the drain. Pre-fix the
1826 // `if target.contains(&shard_id)` filter dropped non-
1827 // target ids on the floor; if the scaling monitor (or
1828 // a parallel `manual_scale_down` on a different target
1829 // set) finalized one of THEIR shards in the same tick,
1830 // that shard ended up Stopped with workers + routing
1831 // entry intact — leaked. Always tear down via
1832 // `remove_shard_internal` so the bus-side state
1833 // (workers, sender, routing) is freed; only count the
1834 // target subset toward the returned Vec.
1835 for shard_id in stopped {
1836 let _ = self.remove_shard_internal(shard_id).await;
1837 if target.contains(&shard_id) {
1838 finalized.insert(shard_id);
1839 }
1840 }
1841 }
1842
1843 // Surface partial success at WARN level. The return shape
1844 // is preserved for compat (changing to `(Vec, Vec)` would
1845 // break the existing callers + test) so a smaller-than-
1846 // target list could otherwise be silently mistaken for full
1847 // success; the WARN log gives operations tooling something
1848 // to alert on.
1849 if finalized.len() < target.len() {
1850 let still_draining: Vec<u16> = target.difference(&finalized).copied().collect();
1851 tracing::warn!(
1852 requested = target.len(),
1853 finalized = finalized.len(),
1854 still_draining = ?still_draining,
1855 "manual_scale_down deadline elapsed before all targeted \
1856 shards finalized — events still in-flight on the listed \
1857 shards. They will finalize on the next scaling-monitor \
1858 tick or on shutdown."
1859 );
1860 }
1861
1862 Ok(finalized.into_iter().collect())
1863 }
1864}
1865
1866impl Drop for EventBus {
1867 fn drop(&mut self) {
1868 // Signal shutdown so background tasks (drain workers, batch
1869 // workers, scaling monitor) observe the flag and exit. We
1870 // cannot await futures in Drop, but setting the atomic flag
1871 // triggers eventual termination.
1872 //
1873 // Previously used `Release` here while `try_enter_ingest`
1874 // and `shutdown()` use `SeqCst` on the same flag. `&mut self`
1875 // exclusion makes that sound today (no concurrent ingest can
1876 // observe a half-published shutdown). The mismatch is purely
1877 // defensive — switching to `SeqCst` matches the rest of the
1878 // lifecycle and removes a footgun if a future change ever
1879 // opened a path where `Drop` overlaps an in-flight
1880 // `try_enter_ingest`.
1881 self.shutdown.store(true, AtomicOrdering::SeqCst);
1882 // Also release the drain-finalize gate so any drain worker
1883 // already parked waiting for it can proceed and exit. Without
1884 // this, drop-without-shutdown leaves drain workers blocked on
1885 // `drain_finalize_ready` until their internal deadline fires
1886 // (which delays task cleanup by `DRAIN_FINALIZE_TIMEOUT`).
1887 // Best-effort durability: drop never gets the in-flight wait,
1888 // so any push that lands after this point is still lost.
1889 self.drain_finalize_ready
1890 .store(true, AtomicOrdering::SeqCst);
1891
1892 // Workers do NOT hold `Arc<EventBus>` — they hold
1893 // independent `Arc<ShardManager>` / `Arc<dyn Adapter>`
1894 // clones plus the channel halves. When `Drop` returns,
1895 // those Arcs survive in the still-running tasks and they
1896 // continue draining / dispatching until they observe the
1897 // shutdown flag we just set. There's no partial-Drop UB
1898 // risk: nothing on the worker side dereferences a
1899 // dropped EventBus field. The flags promote the
1900 // worker tasks from "blocked on recv / parked on
1901 // drain_finalize_ready" to "observe shutdown=true and
1902 // exit" so they don't linger indefinitely.
1903 //
1904 // If `shutdown()` was never awaited, any events still in the
1905 // per-shard ring buffers or mpsc channels are lost — the
1906 // adapter's `flush()` and `shutdown()` won't run, so durable
1907 // backends never see them. We can't fix that from `Drop` (no
1908 // async), but we *can* surface the data-loss risk loudly so
1909 // it doesn't hide. The check is bounded to "shutdown was
1910 // never started"; an in-progress shutdown is fine because the
1911 // call site is awaiting it.
1912 if !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1913 // Count events still sitting in shard ring buffers. They
1914 // are stranded — the drain workers will see `shutdown =
1915 // true` and exit without flushing, the adapter's
1916 // `flush()`/`shutdown()` never run, so anything in the
1917 // rings at this point is permanently lost. Surface that
1918 // loss via `events_dropped` so post-mortem stats reflect
1919 // reality (operators alerting on `events_dropped > 0`
1920 // would otherwise miss the entire incident), and set
1921 // `shutdown_was_lossy` so the boolean view is consistent
1922 // with the counter view.
1923 //
1924 // Events in the BatchWorker mpsc channels or pending
1925 // batches are not counted here — those workers may still
1926 // observe the shutdown flag and exit, but we have no
1927 // synchronous way from Drop to enumerate them. The ring-
1928 // buffer count is a lower bound on the stranded total.
1929 //
1930 // Use the non-blocking accessor: if `Drop` runs on a
1931 // thread that already holds a shard mutex (single-thread
1932 // runtime + panic during shutdown is the canonical
1933 // hazard), the blocking `total_pending_in_rings` would
1934 // self-deadlock. Best-effort accounting is the right
1935 // trade-off here — we'd rather under-report stranded
1936 // events than wedge the drop forever.
1937 let (stranded_in_rings, uncounted_shards) =
1938 self.shard_manager.try_total_pending_in_rings();
1939 if uncounted_shards > 0 {
1940 tracing::warn!(
1941 uncounted_shards,
1942 "EventBus::drop: {uncounted_shards} shard(s) were locked at \
1943 drop time and could not be accounted for in stranded_in_rings"
1944 );
1945 }
1946 if stranded_in_rings > 0 {
1947 self.stats
1948 .events_dropped
1949 .fetch_add(stranded_in_rings, AtomicOrdering::Relaxed);
1950 self.stats
1951 .shutdown_was_lossy
1952 .store(true, AtomicOrdering::Release);
1953 }
1954
1955 let stats = self.shard_manager.stats();
1956 tracing::warn!(
1957 events_ingested = stats.events_ingested,
1958 events_dropped = stats.events_dropped,
1959 stranded_in_rings,
1960 "EventBus dropped without an awaited shutdown(). Any in-flight \
1961 events still in the ring buffers or batch channels will be lost \
1962 — the adapter's flush()/shutdown() never ran. Call \
1963 `bus.shutdown().await` before dropping for durable shutdown."
1964 );
1965 }
1966 }
1967}
1968
1969/// Spin deadline for the second-caller path in
1970/// `shutdown_via_ref`. 10s in production.
1971#[cfg(not(test))]
1972fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1973 std::time::Duration::from_secs(10)
1974}
1975
1976/// Test-only override. Stored as milliseconds; `0` (the default)
1977/// means "use the production 10s". Set via
1978/// [`set_shutdown_via_ref_spin_deadline_for_test`] from inside a
1979/// test that needs to exercise the deadline-elapsed path without
1980/// wall-clock-waiting the full 10s.
1981///
1982/// This is a global atomic shared across all tests in the
1983/// `cargo test --lib` binary. If two tests touched it concurrently,
1984/// one's override would leak into the other's expectations. Tests
1985/// that use the override MUST take the
1986/// [`SHUTDOWN_DEADLINE_OVERRIDE_GUARD`] mutex for the duration of
1987/// their override-setter / read window — see
1988/// [`set_shutdown_via_ref_spin_deadline_for_test`].
1989#[cfg(test)]
1990static SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS: std::sync::atomic::AtomicU64 =
1991 std::sync::atomic::AtomicU64::new(0);
1992
1993/// Serializes access to the deadline override so concurrent tests
1994/// can't observe each other's transient values. Tests that override
1995/// the deadline take this mutex via
1996/// [`shutdown_deadline_override_lock`] and hold the guard until
1997/// they reset the override to 0.
1998///
1999/// Uses `tokio::sync::Mutex` rather than `std::sync::Mutex` so
2000/// the guard can legitimately be held across `.await` points
2001/// while the guarded test runs (clippy::await_holding_lock would
2002/// otherwise fire on the std variant — and rightly so for
2003/// production code).
2004#[cfg(test)]
2005static SHUTDOWN_DEADLINE_OVERRIDE_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
2006
2007/// Acquire the guard mutex protecting the deadline-override
2008/// static. Tests touching the override hold the returned guard
2009/// across both the set and the reset, so concurrent tests
2010/// observe a consistent default.
2011#[cfg(test)]
2012pub(crate) async fn shutdown_deadline_override_lock() -> tokio::sync::MutexGuard<'static, ()> {
2013 SHUTDOWN_DEADLINE_OVERRIDE_GUARD.lock().await
2014}
2015
2016#[cfg(test)]
2017fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
2018 let ms = SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.load(std::sync::atomic::Ordering::Relaxed);
2019 if ms == 0 {
2020 std::time::Duration::from_secs(10)
2021 } else {
2022 std::time::Duration::from_millis(ms)
2023 }
2024}
2025
2026#[cfg(test)]
2027pub(crate) fn set_shutdown_via_ref_spin_deadline_for_test(ms: u64) {
2028 SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
2029}
2030
2031#[cfg(test)]
2032impl EventBus {
2033 /// Test-only: bump `in_flight_ingests` by `n` to simulate
2034 /// stranded producers mid-call to `shard_manager.ingest`.
2035 /// The shutdown spin loop reads `in_flight_ingests` to decide
2036 /// whether work is still pending; the lossy-shutdown
2037 /// reconciliation tests need to drive that counter without
2038 /// the matching real `try_enter_ingest` / `ingest_complete`
2039 /// machinery, which would also bump `events_ingested` and
2040 /// confuse the (stranded, post_deadline_ingests) accounting.
2041 ///
2042 /// Tests own the lifecycle: every `stage_stranded_ingest(n)`
2043 /// MUST be paired with `release_stranded_ingest(n)` before
2044 /// the bus drops, otherwise the `Drop` impl's invariants
2045 /// fire.
2046 pub(crate) fn stage_stranded_ingest(&self, n: u64) {
2047 // Stripe-aware: tests don't care which slot the staged
2048 // count lands in; slot 0 is fine and `release_*` /
2049 // `complete_*` sub from the same slot for net-zero
2050 // accounting. The slot 0 placement is independent of any
2051 // real producer's `slot_for_current_thread()` selection,
2052 // which would (post-fix) most likely hit a different slot
2053 // — that's also fine, the sum is what `shutdown` reads.
2054 self.in_flight_ingests.fetch_add_at(0, n);
2055 }
2056
2057 /// Test-only: counterpart to [`stage_stranded_ingest`].
2058 /// Decrements `in_flight_ingests` by `n` without bumping
2059 /// `events_ingested`. Use when simulating producers that
2060 /// were stranded by the shutdown deadline and never
2061 /// completed (the drain reconciliation should classify them
2062 /// as drops).
2063 pub(crate) fn release_stranded_ingest(&self, n: u64) {
2064 // Mirror of `stage_stranded_ingest`: sub from slot 0.
2065 self.in_flight_ingests.fetch_sub_at(0, n);
2066 }
2067
2068 /// Test-only: simulate a previously-stranded producer
2069 /// completing AFTER the shutdown deadline. Bumps
2070 /// `events_ingested` first (Release ordering — the
2071 /// post-drain reconciliation reads it with Acquire), then
2072 /// releases the in-flight slot. The order matches what a
2073 /// real producer's `ingest` call site does, so the
2074 /// reconciliation's (ingested_after - ingested_at_deadline)
2075 /// arithmetic reads the same way.
2076 pub(crate) fn complete_stranded_ingest(&self, n: u64) {
2077 self.stats
2078 .events_ingested
2079 .fetch_add(n, std::sync::atomic::Ordering::Release);
2080 // Mirror of `stage_stranded_ingest`: sub from slot 0.
2081 self.in_flight_ingests.fetch_sub_at(0, n);
2082 }
2083}
2084
2085async fn run_scaling_monitor_via_weak(weak: std::sync::Weak<EventBus>) {
2086 // Refresh `interval` from the policy on every tick. The previous
2087 // version cached it once at task start, so any future runtime
2088 // policy update would not be adopted by the monitor without a
2089 // process restart. Today `EventBusConfig` is immutable
2090 // post-construction so this is a no-op — but reading it each tick
2091 // is cheap (one atomic / RwLock read) and removes the latent
2092 // footgun.
2093 loop {
2094 let interval = match weak.upgrade() {
2095 Some(bus) => match &bus.config.scaling {
2096 Some(p) => p.metrics_window,
2097 None => return,
2098 },
2099 None => return,
2100 };
2101 tokio::time::sleep(interval).await;
2102
2103 let bus = match weak.upgrade() {
2104 Some(b) => b,
2105 // Last strong ref dropped — caller is shutting down (or
2106 // already gone). Exit cleanly.
2107 None => break,
2108 };
2109
2110 // SeqCst to match the writer side (`EventBus::shutdown` /
2111 // `Drop`). The Acquire/Release handshake on
2112 // `drain_finalize_ready` already provides the load-bearing
2113 // happens-before today — but a future code change that
2114 // piggybacks on `shutdown`'s ordering (e.g. a producer that
2115 // observes shutdown without going through
2116 // `try_enter_ingest`) would silently break under Relaxed.
2117 // Aligning the read-side ordering with the writer-side
2118 // SeqCst is a one-instruction tax for the safety.
2119 if bus.shutdown.load(AtomicOrdering::SeqCst) {
2120 break;
2121 }
2122
2123 // Collect metrics for observability.
2124 if let Some(metrics) = bus.shard_manager.collect_metrics() {
2125 for m in &metrics {
2126 if m.fill_ratio > 0.5 {
2127 tracing::debug!(
2128 shard_id = m.shard_id,
2129 fill_ratio = m.fill_ratio,
2130 event_rate = m.event_rate,
2131 "Shard metrics"
2132 );
2133 }
2134 }
2135 }
2136
2137 // Evaluate scaling.
2138 match bus.shard_manager.evaluate_scaling() {
2139 ScalingDecision::ScaleUp(count) => {
2140 tracing::info!(count = count, "Scaling up shards");
2141 for _ in 0..count {
2142 if let Err(e) = bus.add_shard_internal().await {
2143 tracing::error!(error = %e, "Failed to add shard");
2144 break;
2145 }
2146 }
2147 }
2148 ScalingDecision::ScaleDown(count) => {
2149 tracing::info!(count = count, "Scaling down shards");
2150 if let Some(mapper) = bus.shard_manager.mapper() {
2151 if let Ok(drained) = mapper.scale_down(count) {
2152 for shard_id in drained {
2153 let _ = bus.shard_manager.drain_shard(shard_id);
2154 }
2155 }
2156 }
2157 }
2158 ScalingDecision::None => {}
2159 }
2160
2161 if let Some(mapper) = bus.shard_manager.mapper() {
2162 let stopped = mapper.finalize_draining();
2163 for shard_id in stopped {
2164 let _ = bus.remove_shard_internal(shard_id).await;
2165 }
2166 }
2167
2168 // CRITICAL: drop the strong ref BEFORE the next sleep so a
2169 // concurrent `shutdown(self)` caller can `Arc::try_unwrap`
2170 // the last strong ref while we're sleeping.
2171 drop(bus);
2172 }
2173}
2174
2175/// Spawn a batch worker for a shard.
2176/// Dispatch a batch to the adapter with timeout and optional retries.
2177/// Returns true if the batch was accepted, false if all attempts failed.
2178///
2179/// Non-retryable errors (e.g. `AdapterError::Connection`,
2180/// `AdapterError::Fatal`, `AdapterError::Serialization`) skip the
2181/// retry loop and drop the batch immediately. Retrying a fatal error
2182/// just delays the inevitable while burning CPU and amplifying log
2183/// noise. Use `AdapterError::is_retryable` as the single source of
2184/// truth for this decision.
2185/// Compute the per-attempt backoff for `dispatch_batch` retries.
2186///
2187/// Pre-fix the retry loop slept a flat `Duration::from_millis(100)`
2188/// after every failure. Under a partial backend outage (Redis /
2189/// JetStream slow but not dead), every shard's BatchWorker retried
2190/// on the exact same 100 ms cadence, producing a synchronized
2191/// retry storm that amplified load while the backend was
2192/// recovering.
2193///
2194/// Post-fix: exponential backoff (100, 200, 400, 800, 1600, 3200 ms)
2195/// with per-(shard, attempt) jitter to decorrelate retries across
2196/// shards. Capped at attempt=5 (3.2 s base) so retries don't grow
2197/// unboundedly. Jitter is `[-25%, +25%]` of the base, derived from
2198/// hashing `(shard_id, attempt)` — deterministic per shard but
2199/// uncorrelated across shards, which is exactly what the storm
2200/// mitigation needs.
2201fn retry_backoff(shard_id: u16, attempt: u32) -> Duration {
2202 use std::collections::hash_map::DefaultHasher;
2203 use std::hash::{Hash, Hasher};
2204
2205 // 100 ms × 2^attempt, capped at attempt=5 → 100/200/400/800/1600/3200.
2206 let base_ms: u64 = 100u64.saturating_mul(1u64 << attempt.min(5));
2207
2208 let mut hasher = DefaultHasher::new();
2209 shard_id.hash(&mut hasher);
2210 attempt.hash(&mut hasher);
2211 let h = hasher.finish();
2212 // Jitter in [0, base_ms/2), centered to give [-25%, +25%].
2213 let jitter_range = (base_ms / 2).max(1);
2214 let jitter = (h % jitter_range) as i64 - (jitter_range as i64 / 2);
2215 let final_ms = (base_ms as i64 + jitter).max(1) as u64;
2216
2217 Duration::from_millis(final_ms)
2218}
2219
2220async fn dispatch_batch(
2221 adapter: &dyn Adapter,
2222 batch: Arc<Batch>,
2223 shard_id: u16,
2224 timeout: Duration,
2225 retries: u32,
2226) -> bool {
2227 // Retry attempts clone the `Arc` (refcount bump only); the final
2228 // attempt moves it. Pre-fix the batch was `Batch` and every retry
2229 // deep-cloned the events `Vec` — at the common `retries == 0`
2230 // path this was a wasted N×Bytes refcount bump + Vec alloc on
2231 // every batch.
2232 for attempt in 0..retries {
2233 match tokio::time::timeout(timeout, adapter.on_batch(Arc::clone(&batch))).await {
2234 Ok(Ok(())) => return true,
2235 Ok(Err(e)) => {
2236 if !e.is_retryable() {
2237 // Tag with a `reason` field so this
2238 // distinct drop cause is separately filterable
2239 // from retry-exhausted and timeout in
2240 // observability tools. Shutdown is distinguished
2241 // from generic non-retryable so an operator
2242 // chasing "why are batches being dropped" can
2243 // immediately tell a sequencing bug (sending to
2244 // a stopped adapter) from a transport / config
2245 // failure.
2246 let reason = if e.is_shutdown() {
2247 "adapter_shutdown"
2248 } else {
2249 "non_retryable"
2250 };
2251 tracing::error!(
2252 shard_id,
2253 error = %e,
2254 attempt,
2255 reason,
2256 "Non-retryable error from adapter, dropping batch"
2257 );
2258 return false;
2259 }
2260 tracing::warn!(shard_id, error = %e, attempt, "Batch dispatch failed, retrying");
2261 }
2262 Err(_) => {
2263 tracing::warn!(shard_id, attempt, "Adapter on_batch timed out, retrying");
2264 }
2265 }
2266 tokio::time::sleep(retry_backoff(shard_id, attempt)).await;
2267 }
2268
2269 // Pre-fix the final attempt collapsed every drop into
2270 // one log line ("Failed to dispatch batch, dropping"), making
2271 // it impossible to tell retry-exhausted from fatal-non-
2272 // retryable from timeout in metrics. The non-retryable case
2273 // already has its own log inside the retry loop above (early
2274 // return); here we tag retry-exhausted vs timeout-after-
2275 // retries with distinct `reason` fields so log-based
2276 // observability tools can break the drops out by cause.
2277 match tokio::time::timeout(timeout, adapter.on_batch(batch)).await {
2278 Ok(Ok(())) => true,
2279 Ok(Err(e)) => {
2280 tracing::error!(
2281 shard_id,
2282 error = %e,
2283 reason = "retry_exhausted",
2284 attempts = retries + 1,
2285 "Failed to dispatch batch after exhausting retries, dropping"
2286 );
2287 false
2288 }
2289 Err(_) => {
2290 tracing::error!(
2291 shard_id,
2292 reason = "timeout",
2293 attempts = retries + 1,
2294 "Adapter on_batch timed out on final attempt, dropping batch"
2295 );
2296 false
2297 }
2298 }
2299}
2300
2301struct BatchWorkerParams {
2302 shard_id: u16,
2303 rx: mpsc::Receiver<Vec<crate::event::InternalEvent>>,
2304 adapter: Arc<dyn Adapter>,
2305 shard_manager: Arc<ShardManager>,
2306 config: BatchConfig,
2307 adapter_timeout: Duration,
2308 batch_retries: u32,
2309 /// Bus-owned mirror of `BatchWorker::next_sequence`. The worker
2310 /// stores its post-flush sequence here on every dispatch so the
2311 /// bus can read it after the worker exits — see
2312 /// `ShardWorkers::next_sequence` for the consumer side.
2313 next_sequence: Arc<AtomicU64>,
2314 /// Bus-level stats. The worker increments
2315 /// `batches_dispatched` and `events_dispatched` after every
2316 /// successful `dispatch_batch`. Both must actually be
2317 /// incremented here, otherwise `flush()`'s Phase 2 progress
2318 /// probe would always observe zero progress and early-break
2319 /// after a single `max_delay` window — racing the
2320 /// BatchWorker's first `recv_timeout` and flaking on
2321 /// Windows-class timer resolution.
2322 stats: Arc<EventBusStats>,
2323 /// Producer nonce stamped on every batch the worker emits.
2324 /// Bus-loaded from the persistent path when
2325 /// `producer_nonce_path` is configured, otherwise from the
2326 /// per-process default.
2327 producer_nonce: u64,
2328}
2329
2330fn spawn_batch_worker(params: BatchWorkerParams) -> JoinHandle<()> {
2331 let BatchWorkerParams {
2332 shard_id,
2333 mut rx,
2334 adapter,
2335 shard_manager,
2336 config,
2337 adapter_timeout,
2338 batch_retries,
2339 next_sequence,
2340 stats,
2341 producer_nonce,
2342 } = params;
2343 tokio::spawn(async move {
2344 let mut worker = BatchWorker::new(shard_id, config.clone(), next_sequence, producer_nonce);
2345
2346 loop {
2347 // Wait for events with timeout. The batch worker exits
2348 // only when its channel is closed — i.e. after every
2349 // upstream sender (the drain worker for this shard +
2350 // `EventBus::batch_senders`) has been dropped.
2351 // `EventBus::shutdown` enforces that ordering so no
2352 // event is left stranded in the channel.
2353 let recv_timeout = worker.time_until_timeout().unwrap_or(config.max_delay);
2354
2355 match tokio::time::timeout(recv_timeout, rx.recv()).await {
2356 Ok(Some(events)) => {
2357 if let Some(batch) = worker.add_events(events) {
2358 let batch_len = batch.len() as u64;
2359 if dispatch_batch(
2360 &*adapter,
2361 Arc::new(batch),
2362 shard_id,
2363 adapter_timeout,
2364 batch_retries,
2365 )
2366 .await
2367 {
2368 stats
2369 .batches_dispatched
2370 .fetch_add(1, AtomicOrdering::Relaxed);
2371 stats
2372 .events_dispatched
2373 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2374 // PERF_AUDIT §1.4 — lock-free counter
2375 // bump on the per-shard `ShardCounters`
2376 // (parallel `Vec` exposed by
2377 // `ShardManager`). Pre-fix this
2378 // `shard_ref.lock().record_batch_dispatch()`
2379 // took the producer-hot shard mutex just
2380 // to bump an atomic, injecting periodic
2381 // latency spikes into the ingest path.
2382 shard_manager.record_batch_dispatch(shard_id);
2383 }
2384 }
2385 }
2386 Ok(None) => {
2387 // Channel closed — drain any pending and exit.
2388 if worker.has_pending() {
2389 let batch = worker.flush();
2390 if !batch.is_empty() {
2391 let batch_len = batch.len() as u64;
2392 if dispatch_batch(
2393 &*adapter,
2394 Arc::new(batch),
2395 shard_id,
2396 adapter_timeout,
2397 batch_retries,
2398 )
2399 .await
2400 {
2401 stats
2402 .batches_dispatched
2403 .fetch_add(1, AtomicOrdering::Relaxed);
2404 stats
2405 .events_dispatched
2406 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2407 }
2408 }
2409 }
2410 break;
2411 }
2412 Err(_) => {
2413 // Timeout - check if we need to flush.
2414 // Pre-fix [perf #38] called `worker.add_events(vec![])`
2415 // as the signal, allocating an empty `Vec` per
2416 // timeout tick. `check_timeout` is the direct
2417 // expression of intent — no alloc.
2418 if let Some(batch) = worker.check_timeout() {
2419 let batch_len = batch.len() as u64;
2420 if dispatch_batch(
2421 &*adapter,
2422 Arc::new(batch),
2423 shard_id,
2424 adapter_timeout,
2425 batch_retries,
2426 )
2427 .await
2428 {
2429 stats
2430 .batches_dispatched
2431 .fetch_add(1, AtomicOrdering::Relaxed);
2432 stats
2433 .events_dispatched
2434 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2435 // PERF_AUDIT §1.4 — lock-free counter
2436 // bump on the per-shard `ShardCounters`
2437 // (parallel `Vec` exposed by
2438 // `ShardManager`). Pre-fix this
2439 // `shard_ref.lock().record_batch_dispatch()`
2440 // took the producer-hot shard mutex just
2441 // to bump an atomic, injecting periodic
2442 // latency spikes into the ingest path.
2443 shard_manager.record_batch_dispatch(shard_id);
2444 }
2445 }
2446 }
2447 }
2448 }
2449 })
2450}
2451
2452/// Maximum time a drain worker waits for `drain_finalize_ready`
2453/// after observing `shutdown=true`. Defense in depth: if a caller
2454/// drops the bus mid-shutdown without setting the gate, we don't
2455/// want the worker pinned forever. The shutdown path *always* sets
2456/// the gate (even on its own timeout), so this deadline is normally
2457/// unreached.
2458const DRAIN_FINALIZE_TIMEOUT: Duration = Duration::from_secs(10);
2459
2460/// Spawn a drain worker for a single shard.
2461///
2462/// Uses a scratch `Vec` + `pop_batch_into` so the per-cycle
2463/// allocation happens *outside* the shard mutex critical section.
2464/// Each cycle: lock → drain into scratch (no alloc, capacity already
2465/// reserved) → unlock → `mem::replace` swaps the filled scratch out
2466/// for a fresh empty `Vec` (alloc *outside* the lock) → send the
2467/// filled batch on the channel.
2468fn spawn_drain_worker_for_shard(
2469 shard_id: u16,
2470 shard_manager: Arc<ShardManager>,
2471 sender: mpsc::Sender<Vec<crate::event::InternalEvent>>,
2472 shutdown: Arc<AtomicBool>,
2473 drain_finalize_ready: Arc<AtomicBool>,
2474) -> JoinHandle<()> {
2475 const STEADY_BATCH: usize = 1_000;
2476 const FINAL_BATCH: usize = 10_000;
2477
2478 tokio::spawn(async move {
2479 let mut scratch: Vec<crate::event::InternalEvent> = Vec::with_capacity(STEADY_BATCH);
2480
2481 loop {
2482 // SeqCst to match the writer side (`EventBus::shutdown` /
2483 // `Drop`). `try_enter_ingest` itself uses SeqCst, and
2484 // the Acquire/Release handshake on
2485 // `drain_finalize_ready` (below) is what actually makes
2486 // the producer-push happen-before visible. Aligning to
2487 // SeqCst here makes the contract robust to future
2488 // producer-side changes that might piggyback on
2489 // `shutdown`'s ordering.
2490 if shutdown.load(AtomicOrdering::SeqCst) {
2491 // Before doing the final sweep, wait for `shutdown()`
2492 // to release the finalize gate. The gate is set only
2493 // after the in-flight ingest counter reaches zero,
2494 // which means every producer that read `shutdown=false`
2495 // has completed its push. Without this wait, the drain
2496 // worker can race ahead of a late push under
2497 // shard-mutex serialization (drain takes the lock
2498 // first, sees nothing, exits; producer then takes the
2499 // lock and pushes — event stranded).
2500 //
2501 // Acquire pairs with the Release in `EventBus::shutdown`
2502 // and `EventBus::drop`, transitively making every
2503 // producer push that happened-before its `in_flight`
2504 // decrement visible to the subsequent `pop_batch_into`.
2505 // `tokio::time::Instant` so virtualized-clock tests
2506 // (`tokio::time::pause`) advance the deadline via
2507 // `sleep` rather than spinning until wall-clock catches
2508 // up.
2509 let finalize_deadline = tokio::time::Instant::now() + DRAIN_FINALIZE_TIMEOUT;
2510 while !drain_finalize_ready.load(AtomicOrdering::Acquire) {
2511 if tokio::time::Instant::now() >= finalize_deadline {
2512 tracing::warn!(
2513 shard_id,
2514 "drain worker timed out waiting for finalize gate; \
2515 proceeding with potential event loss"
2516 );
2517 break;
2518 }
2519 // Park instead of `yield_now` so we don't
2520 // starve the workers / producers we're waiting
2521 // on under contention.
2522 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
2523 }
2524
2525 // Final drain: loop until the ring buffer is empty.
2526 // A single 10k batch is not enough — the ring
2527 // buffer can hold up to `ring_buffer_capacity`
2528 // events (default 1M) and any leftover would be
2529 // silently lost on shutdown.
2530 //
2531 // Pre-fix this broke at the first
2532 // `popped == 0`. The audit posited a narrow race
2533 // where a producer that fetch_add'd
2534 // in_flight_ingests but stalled before the
2535 // shard-lock body could push AFTER shutdown
2536 // observed in_flight=0 yet BEFORE this final
2537 // sweep saw the event. The SeqCst guard pattern
2538 // makes this unlikely (the push happens-before
2539 // the guard drop), but the defense is cheap:
2540 // require TWO consecutive zero-event passes
2541 // before declaring drain. The yield_now between
2542 // them gives a stalled producer a chance to land
2543 // the push.
2544 let mut final_scratch: Vec<crate::event::InternalEvent> =
2545 Vec::with_capacity(FINAL_BATCH);
2546 let mut consecutive_zeros = 0u32;
2547 loop {
2548 let popped = shard_manager
2549 .with_shard(shard_id, |shard| {
2550 shard.pop_batch_into(&mut final_scratch, FINAL_BATCH)
2551 })
2552 .unwrap_or(0);
2553 if popped == 0 {
2554 consecutive_zeros += 1;
2555 if consecutive_zeros >= 2 {
2556 break;
2557 }
2558 // Yield to let any racing producer commit
2559 // its push, then re-poll.
2560 tokio::task::yield_now().await;
2561 continue;
2562 }
2563 consecutive_zeros = 0;
2564 let batch =
2565 std::mem::replace(&mut final_scratch, Vec::with_capacity(FINAL_BATCH));
2566 let batch_len = batch.len();
2567 if let Err(_send_err) = sender.send(batch).await {
2568 // Batch worker exited before drain. The
2569 // `mem::replace` already pulled events out
2570 // of the ring buffer, so the dropped batch
2571 // is unrecoverable — the SendError carries
2572 // it back but the consumer is gone. Surface
2573 // the count loudly so the loss is
2574 // observable in operator dashboards rather
2575 // than a silent miss in shutdown stats.
2576 tracing::error!(
2577 shard_id,
2578 dropped = batch_len,
2579 "drain worker (final): batch worker dropped \
2580 channel before final drain completed; \
2581 events removed from ring buffer cannot be redelivered",
2582 );
2583 break;
2584 }
2585 }
2586 break;
2587 }
2588
2589 // Drain events from ring buffer.
2590 let popped = shard_manager.with_shard(shard_id, |shard| {
2591 shard.pop_batch_into(&mut scratch, STEADY_BATCH)
2592 });
2593
2594 match popped {
2595 Some(0) => {
2596 // No events — yield briefly. The 100μs sleep is deliberate:
2597 // this is a latency-first system where the drain loop is the
2598 // hot path. Longer backoff would add milliseconds of latency
2599 // to the first event after a quiet period, violating the
2600 // sub-microsecond design target. The CPU cost of 100μs polling
2601 // is acceptable for a system that processes 10M+ events/sec.
2602 tokio::time::sleep(Duration::from_micros(100)).await;
2603 }
2604 Some(_) => {
2605 let batch = std::mem::replace(&mut scratch, Vec::with_capacity(STEADY_BATCH));
2606 let batch_len = batch.len();
2607 if let Err(_send_err) = sender.send(batch).await {
2608 // Steady-state: the only way the batch
2609 // worker drops the channel is if it
2610 // panicked or `remove_shard_internal`
2611 // tore it down out of order with the
2612 // drain worker (which the documented
2613 // shutdown sequence forbids). Either way,
2614 // the events are unrecoverable — the
2615 // `mem::replace` above already pulled them
2616 // out of the ring buffer. Pre-fix this
2617 // simply `break`-d, leaving the loss
2618 // invisible. Surface a loud error with
2619 // the dropped count so an out-of-order
2620 // shutdown or batch-worker panic shows up
2621 // in dashboards rather than as a silent
2622 // metric gap.
2623 tracing::error!(
2624 shard_id,
2625 dropped = batch_len,
2626 "drain worker: batch worker dropped channel \
2627 during steady-state drain; events removed from \
2628 ring buffer cannot be redelivered",
2629 );
2630 break;
2631 }
2632 }
2633 None => {
2634 // Shard no longer exists (was removed)
2635 break;
2636 }
2637 }
2638 }
2639 })
2640}
2641
2642#[cfg(test)]
2643mod tests {
2644 use super::*;
2645 use crate::shard::ScalingPolicy;
2646 use serde_json::json;
2647
2648 /// PERF_AUDIT §1.1 — striped counter sums correctly across
2649 /// concurrent producers. Spawns 16 threads, each does 10_000
2650 /// add+sub on its sticky slot. After joins the sum must be 0
2651 /// (every add was paired with a sub), and slots must NOT all
2652 /// be 0 mid-flight (proving the stripe was actually used —
2653 /// not collapsed onto one slot by accident).
2654 #[test]
2655 fn striped_in_flight_sum_across_concurrent_producers() {
2656 let counter = std::sync::Arc::new(StripedInFlight::new());
2657 let mut handles = Vec::new();
2658 for _ in 0..16 {
2659 let c = counter.clone();
2660 handles.push(std::thread::spawn(move || {
2661 let slot = c.slot_for_current_thread();
2662 for _ in 0..10_000 {
2663 c.fetch_add_at(slot, 1);
2664 c.fetch_sub_at(slot, 1);
2665 }
2666 }));
2667 }
2668 for h in handles {
2669 h.join().unwrap();
2670 }
2671 assert_eq!(
2672 counter.sum(),
2673 0,
2674 "every add was paired with a sub: net sum must be zero"
2675 );
2676 }
2677
2678 /// PERF_AUDIT §1.1 — the stripe's accounting contract, pinned
2679 /// deterministically: every slot accumulates independently and
2680 /// `sum()` aggregates all of them. A regression collapsing the
2681 /// stripe onto one shared slot (the pre-§1.1 shape) aliases the
2682 /// per-slot counts and fails the per-slot assertions below.
2683 ///
2684 /// Deliberately does NOT assert how many distinct slots a set
2685 /// of threads lands on: the thread→slot mapping hashes
2686 /// `ThreadId`s, whose distribution std does not guarantee, so a
2687 /// "N distinct slots populated" assertion is nondeterministic
2688 /// (it can fail under legitimate thread-id collisions). The
2689 /// mapping's actual contract — in-bounds and sticky per thread
2690 /// — is asserted separately below.
2691 #[test]
2692 fn striped_in_flight_slots_account_independently() {
2693 let counter = StripedInFlight::new();
2694 // Distinct count per slot so aliasing between any two slots
2695 // changes at least one per-slot readback.
2696 for slot in 0..IN_FLIGHT_STRIPE_SLOTS {
2697 counter.fetch_add_at(slot, slot as u64 + 1);
2698 }
2699 let expected_total: u64 = (1..=IN_FLIGHT_STRIPE_SLOTS as u64).sum();
2700 assert_eq!(counter.sum(), expected_total, "sum must cover every slot");
2701 for slot in 0..IN_FLIGHT_STRIPE_SLOTS {
2702 assert_eq!(
2703 counter.slots[slot].load(AtomicOrdering::SeqCst),
2704 slot as u64 + 1,
2705 "slot {slot} must hold exactly its own count (no aliasing)"
2706 );
2707 }
2708 }
2709
2710 /// PERF_AUDIT §1.1 — thread→slot mapping contract: in-bounds
2711 /// for every thread, and sticky (repeat calls from the same
2712 /// thread return the same slot, so a producer's add and its
2713 /// guard's sub land on the same cache line).
2714 #[test]
2715 fn striped_in_flight_slot_mapping_is_in_bounds_and_sticky() {
2716 let counter = std::sync::Arc::new(StripedInFlight::new());
2717 let mut handles = Vec::new();
2718 for _ in 0..16 {
2719 let c = counter.clone();
2720 handles.push(std::thread::spawn(move || {
2721 let first = c.slot_for_current_thread();
2722 let second = c.slot_for_current_thread();
2723 assert!(first < IN_FLIGHT_STRIPE_SLOTS, "slot out of bounds");
2724 assert_eq!(first, second, "slot must be sticky per thread");
2725 }));
2726 }
2727 for h in handles {
2728 h.join().unwrap();
2729 }
2730 }
2731
2732 /// PERF_AUDIT §1.1 — the shutdown gate must survive the
2733 /// striping. After `shutdown_via_ref` completes, `ingest` must
2734 /// be rejected with `ShuttingDown`, AND the rejected attempt
2735 /// must leave the striped sum at exactly zero — the gate's
2736 /// add-check-sub sequence nets out on the producer's slot. A
2737 /// leaked increment here would skew the stranded-count
2738 /// accounting any later wait-for-zero reader relies on.
2739 #[tokio::test]
2740 async fn striped_in_flight_shutdown_gate_rejects_and_nets_zero() {
2741 let config = EventBusConfig::builder()
2742 .num_shards(2)
2743 .ring_buffer_capacity(1024)
2744 .build()
2745 .unwrap();
2746 let bus = EventBus::new(config).await.unwrap();
2747 bus.shutdown_via_ref().await.unwrap();
2748
2749 let err = bus.ingest(Event::new(json!({"late": true}))).unwrap_err();
2750 assert!(
2751 matches!(err, IngestionError::ShuttingDown),
2752 "post-shutdown ingest must fail closed, got {err:?}"
2753 );
2754 assert_eq!(
2755 bus.in_flight_ingests.sum(),
2756 0,
2757 "rejected gate entry must net-zero its stripe slot"
2758 );
2759 }
2760
2761 #[tokio::test]
2762 async fn test_event_bus_basic() {
2763 let config = EventBusConfig::builder()
2764 .num_shards(2)
2765 .ring_buffer_capacity(1024)
2766 .build()
2767 .unwrap();
2768
2769 let bus = EventBus::new(config).await.unwrap();
2770
2771 // Ingest some events
2772 for i in 0..10 {
2773 let event = Event::new(json!({"index": i}));
2774 bus.ingest(event).unwrap();
2775 }
2776
2777 // Give workers time to process
2778 tokio::time::sleep(Duration::from_millis(100)).await;
2779
2780 // Check stats
2781 assert_eq!(
2782 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2783 10
2784 );
2785
2786 bus.shutdown().await.unwrap();
2787 }
2788
2789 #[tokio::test]
2790 async fn test_event_bus_batch_ingest() {
2791 let config = EventBusConfig::default();
2792 let bus = EventBus::new(config).await.unwrap();
2793
2794 let events: Vec<Event> = (0..100).map(|i| Event::new(json!({"i": i}))).collect();
2795
2796 let ingested = bus.ingest_batch(events);
2797 assert_eq!(ingested, 100);
2798
2799 bus.shutdown().await.unwrap();
2800 }
2801
2802 #[tokio::test]
2803 async fn test_event_bus_with_dynamic_scaling() {
2804 let policy = ScalingPolicy {
2805 min_shards: 2,
2806 max_shards: 8,
2807 ..Default::default()
2808 };
2809
2810 let config = EventBusConfig::builder()
2811 .num_shards(2)
2812 .ring_buffer_capacity(1024)
2813 .scaling(policy)
2814 .build()
2815 .unwrap();
2816
2817 let bus = EventBus::new(config).await.unwrap();
2818
2819 // Verify dynamic scaling is enabled
2820 assert!(bus.is_dynamic_scaling_enabled());
2821 assert_eq!(bus.num_shards(), 2);
2822
2823 // Ingest some events
2824 for i in 0..100 {
2825 let event = Event::new(json!({"index": i}));
2826 bus.ingest(event).unwrap();
2827 }
2828
2829 // Give workers time to process
2830 tokio::time::sleep(Duration::from_millis(100)).await;
2831
2832 // Check stats
2833 assert_eq!(
2834 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2835 100
2836 );
2837
2838 bus.shutdown().await.unwrap();
2839 }
2840
2841 #[tokio::test]
2842 async fn test_manual_scale_up() {
2843 let policy = ScalingPolicy {
2844 min_shards: 2,
2845 max_shards: 8,
2846 cooldown: Duration::from_nanos(1), // Effectively disable cooldown for test
2847 ..Default::default()
2848 };
2849
2850 let config = EventBusConfig::builder()
2851 .num_shards(2)
2852 .ring_buffer_capacity(1024)
2853 .scaling(policy)
2854 .build()
2855 .unwrap();
2856
2857 let bus = EventBus::new(config).await.unwrap();
2858
2859 assert_eq!(bus.num_shards(), 2);
2860
2861 // Manually scale up
2862 let new_ids = bus.manual_scale_up(2).await.unwrap();
2863 assert_eq!(new_ids.len(), 2);
2864 assert_eq!(bus.num_shards(), 4);
2865
2866 // Ingest events - they should be distributed across all shards
2867 for i in 0..100 {
2868 let event = Event::new(json!({"index": i}));
2869 bus.ingest(event).unwrap();
2870 }
2871
2872 tokio::time::sleep(Duration::from_millis(100)).await;
2873
2874 assert_eq!(
2875 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2876 100
2877 );
2878
2879 bus.shutdown().await.unwrap();
2880 }
2881
2882 /// Regression for BUG_AUDIT_2026_04_30_CORE.md #82: previously
2883 /// `manual_scale_down` only called `mapper.scale_down(count)`,
2884 /// which marks shards as `Draining` but does NOT finalize them.
2885 /// Bus configs without an active scaling monitor (or callers
2886 /// shutting down before the monitor's next tick) lost any
2887 /// events queued in the drained shards' ring buffers because
2888 /// `remove_shard_internal` was never invoked. The fix runs the
2889 /// full lifecycle synchronously: scale_down → poll for empty →
2890 /// finalize_draining → remove_shard_internal.
2891 ///
2892 /// We pin this by scaling up, manually scaling down, and
2893 /// asserting that `num_shards` actually decreases — pre-fix
2894 /// the count would still reflect the Draining shards.
2895 #[tokio::test]
2896 async fn manual_scale_down_finalizes_and_removes_drained_shards() {
2897 let policy = ScalingPolicy {
2898 min_shards: 2,
2899 max_shards: 8,
2900 cooldown: Duration::from_nanos(1),
2901 ..Default::default()
2902 };
2903 let config = EventBusConfig::builder()
2904 .num_shards(2)
2905 .ring_buffer_capacity(1024)
2906 .scaling(policy)
2907 .build()
2908 .unwrap();
2909 let bus = EventBus::new(config).await.unwrap();
2910
2911 // Scale up to 4, then back down to 2.
2912 let added = bus.manual_scale_up(2).await.unwrap();
2913 assert_eq!(added.len(), 2);
2914 assert_eq!(bus.num_shards(), 4);
2915
2916 let removed = bus.manual_scale_down(2).await.unwrap();
2917 assert_eq!(
2918 removed.len(),
2919 2,
2920 "manual_scale_down must complete the lifecycle for both \
2921 requested shards (mark Draining → wait → finalize → remove)"
2922 );
2923
2924 // Pre-fix: `num_shards` would still be 4 because shards
2925 // were only marked Draining (and the routing-table removal
2926 // path never ran). Post-fix: it's back to 2.
2927 assert_eq!(
2928 bus.num_shards(),
2929 2,
2930 "drained shards must be removed from the routing table"
2931 );
2932
2933 bus.shutdown().await.unwrap();
2934 }
2935
2936 #[tokio::test]
2937 async fn test_shard_metrics() {
2938 let policy = ScalingPolicy::default();
2939
2940 let config = EventBusConfig::builder()
2941 .num_shards(2)
2942 .ring_buffer_capacity(1024)
2943 .scaling(policy)
2944 .build()
2945 .unwrap();
2946
2947 let bus = EventBus::new(config).await.unwrap();
2948
2949 // Ingest some events
2950 for i in 0..50 {
2951 let event = Event::new(json!({"index": i}));
2952 bus.ingest(event).unwrap();
2953 }
2954
2955 // Get metrics
2956 let metrics = bus.shard_metrics();
2957 assert!(metrics.is_some());
2958 let metrics = metrics.unwrap();
2959 assert_eq!(metrics.len(), 2);
2960
2961 bus.shutdown().await.unwrap();
2962 }
2963
2964 #[tokio::test]
2965 async fn test_regression_eventbus_drop_signals_shutdown() {
2966 // Regression: dropping an EventBus without calling shutdown() used to
2967 // leave background tasks running indefinitely. The Drop impl now sets
2968 // the shutdown flag so workers eventually exit.
2969 let result = tokio::time::timeout(Duration::from_secs(5), async {
2970 let config = EventBusConfig::builder()
2971 .num_shards(2)
2972 .ring_buffer_capacity(1024)
2973 .build()
2974 .unwrap();
2975
2976 let bus = EventBus::new(config).await.unwrap();
2977
2978 // Ingest some events
2979 for i in 0..10 {
2980 let event = Event::new(json!({"index": i}));
2981 bus.ingest(event).unwrap();
2982 }
2983
2984 // Drop without calling shutdown()
2985 drop(bus);
2986
2987 // If we reach here, the drop didn't hang
2988 })
2989 .await;
2990
2991 assert!(
2992 result.is_ok(),
2993 "EventBus drop should not hang — Drop impl must signal shutdown"
2994 );
2995 }
2996
2997 #[tokio::test]
2998 async fn test_with_dynamic_scaling_builder() {
2999 let config = EventBusConfig::builder()
3000 .num_shards(4)
3001 .ring_buffer_capacity(2048)
3002 .with_dynamic_scaling()
3003 .build()
3004 .unwrap();
3005
3006 let bus = EventBus::new(config).await.unwrap();
3007
3008 assert!(bus.is_dynamic_scaling_enabled());
3009 assert_eq!(bus.num_shards(), 4);
3010
3011 bus.shutdown().await.unwrap();
3012 }
3013
3014 /// Mock adapter that counts `on_batch` invocations and returns a
3015 /// configurable error variant. Used to assert dispatch retry
3016 /// semantics without dragging in a real adapter.
3017 struct CountingErrAdapter {
3018 calls: Arc<std::sync::atomic::AtomicU32>,
3019 make_err: Box<dyn Fn() -> AdapterError + Send + Sync>,
3020 }
3021
3022 #[async_trait::async_trait]
3023 impl crate::adapter::Adapter for CountingErrAdapter {
3024 async fn init(&mut self) -> Result<(), AdapterError> {
3025 Ok(())
3026 }
3027 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3028 self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3029 Err((self.make_err)())
3030 }
3031 async fn flush(&self) -> Result<(), AdapterError> {
3032 Ok(())
3033 }
3034 async fn shutdown(&self) -> Result<(), AdapterError> {
3035 Ok(())
3036 }
3037 async fn poll_shard(
3038 &self,
3039 _shard_id: u16,
3040 _from_id: Option<&str>,
3041 _limit: usize,
3042 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3043 Ok(crate::adapter::ShardPollResult::empty())
3044 }
3045 fn name(&self) -> &'static str {
3046 "counting_err"
3047 }
3048 async fn is_healthy(&self) -> bool {
3049 true
3050 }
3051 }
3052
3053 /// Regression: BUG_REPORT.md #21 — `dispatch_batch` previously
3054 /// retried every error variant, ignoring `AdapterError::is_retryable`.
3055 /// A non-retryable error (Connection / Fatal / Serialization)
3056 /// should now drop the batch immediately rather than burn the
3057 /// retry budget on something that cannot succeed.
3058 #[tokio::test(start_paused = true)]
3059 async fn dispatch_batch_skips_retries_on_non_retryable_error() {
3060 let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
3061 let adapter = CountingErrAdapter {
3062 calls: calls.clone(),
3063 make_err: Box::new(|| AdapterError::Connection("refused".into())),
3064 };
3065
3066 let batch = Batch::new(0, vec![], 0);
3067 let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 5).await;
3068
3069 assert!(!ok, "non-retryable error must drop batch");
3070 assert_eq!(
3071 calls.load(std::sync::atomic::Ordering::SeqCst),
3072 1,
3073 "Connection error must not be retried; expected exactly 1 on_batch call"
3074 );
3075 }
3076
3077 /// Sanity: a retryable error *does* go through the full retry
3078 /// budget. Without this companion check, the previous test could
3079 /// pass for the wrong reason (e.g. if dispatch always returned on
3080 /// the first error).
3081 #[tokio::test(start_paused = true)]
3082 async fn dispatch_batch_retries_transient_errors() {
3083 let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
3084 let adapter = CountingErrAdapter {
3085 calls: calls.clone(),
3086 make_err: Box::new(|| AdapterError::Transient("temp".into())),
3087 };
3088
3089 let batch = Batch::new(0, vec![], 0);
3090 let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 3).await;
3091
3092 assert!(!ok);
3093 // 3 retries + 1 final attempt = 4 total calls.
3094 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 4);
3095 }
3096
3097 /// Pin the perf contract behind the `Arc<Batch>` change.
3098 ///
3099 /// Pre-fix `dispatch_batch` deep-cloned the events `Vec` on every
3100 /// retry attempt (and the comment in production code already
3101 /// admitted this was wasted on the common `retries == 0` path).
3102 /// Post-fix the dispatcher takes `Arc<Batch>` and each retry is a
3103 /// refcount bump.
3104 ///
3105 /// We pin the contract observationally: an adapter that fails 3
3106 /// times then succeeds receives the *same* `Arc<Batch>` pointer
3107 /// on every call. A regression that ever rebuilt the Batch
3108 /// (e.g. a future refactor that does `Arc::new(batch.as_ref().clone())`)
3109 /// would surface here as distinct `Arc::as_ptr` values.
3110 #[tokio::test(start_paused = true)]
3111 async fn dispatch_batch_retries_share_the_same_arc_allocation() {
3112 // Identity-record `Arc::as_ptr(&batch)` per call via the
3113 // batch's strong-count snapshot. We cast to `usize` so the
3114 // adapter can stay `Send + Sync` without an unsafe impl —
3115 // the value is just an opaque identity stamp.
3116 struct PointerRecordingAdapter {
3117 seen: Arc<parking_lot::Mutex<Vec<usize>>>,
3118 fail_first_n: u32,
3119 calls: Arc<std::sync::atomic::AtomicU32>,
3120 }
3121
3122 #[async_trait::async_trait]
3123 impl crate::adapter::Adapter for PointerRecordingAdapter {
3124 async fn init(&mut self) -> Result<(), AdapterError> {
3125 Ok(())
3126 }
3127 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3128 self.seen.lock().push(Arc::as_ptr(&batch) as usize);
3129 let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
3130 if n < self.fail_first_n {
3131 Err(AdapterError::Transient("retry me".into()))
3132 } else {
3133 Ok(())
3134 }
3135 }
3136 async fn flush(&self) -> Result<(), AdapterError> {
3137 Ok(())
3138 }
3139 async fn shutdown(&self) -> Result<(), AdapterError> {
3140 Ok(())
3141 }
3142 async fn poll_shard(
3143 &self,
3144 _shard_id: u16,
3145 _from_id: Option<&str>,
3146 _limit: usize,
3147 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3148 Ok(crate::adapter::ShardPollResult::empty())
3149 }
3150 fn name(&self) -> &'static str {
3151 "pointer-recording"
3152 }
3153 }
3154
3155 let seen = Arc::new(parking_lot::Mutex::new(Vec::new()));
3156 let adapter = PointerRecordingAdapter {
3157 seen: Arc::clone(&seen),
3158 fail_first_n: 3,
3159 calls: Arc::new(std::sync::atomic::AtomicU32::new(0)),
3160 };
3161 let batch = Arc::new(Batch::new(0, vec![], 0));
3162 let original_ptr = Arc::as_ptr(&batch) as usize;
3163
3164 let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 5).await;
3165 assert!(ok, "adapter accepts on 4th try; dispatch must succeed");
3166
3167 let pointers = seen.lock().clone();
3168 assert_eq!(
3169 pointers.len(),
3170 4,
3171 "expected 3 failures + 1 success = 4 on_batch calls",
3172 );
3173 // Every call observed the SAME Arc target. A regression that
3174 // deep-clones would have produced four distinct pointers.
3175 for (i, p) in pointers.iter().enumerate() {
3176 assert_eq!(
3177 *p, original_ptr,
3178 "attempt {i} saw a different Arc<Batch>; dispatch_batch deep-cloned",
3179 );
3180 }
3181 }
3182
3183 /// Counting adapter that records the number of events delivered via
3184 /// `on_batch`. Used by shutdown-durability tests below.
3185 struct CountingAdapter {
3186 received: Arc<AtomicU64>,
3187 }
3188
3189 #[async_trait::async_trait]
3190 impl crate::adapter::Adapter for CountingAdapter {
3191 async fn init(&mut self) -> Result<(), AdapterError> {
3192 Ok(())
3193 }
3194 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3195 self.received
3196 .fetch_add(batch.events.len() as u64, AtomicOrdering::SeqCst);
3197 Ok(())
3198 }
3199 async fn flush(&self) -> Result<(), AdapterError> {
3200 Ok(())
3201 }
3202 async fn shutdown(&self) -> Result<(), AdapterError> {
3203 Ok(())
3204 }
3205 async fn poll_shard(
3206 &self,
3207 _shard_id: u16,
3208 _from_id: Option<&str>,
3209 _limit: usize,
3210 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3211 Ok(crate::adapter::ShardPollResult::empty())
3212 }
3213 fn name(&self) -> &'static str {
3214 "counting"
3215 }
3216 async fn is_healthy(&self) -> bool {
3217 true
3218 }
3219 }
3220
3221 /// `retry_backoff` exponentially grows the base delay
3222 /// per attempt and adds per-(shard, attempt) jitter to
3223 /// decorrelate retries across shards. Pin both invariants:
3224 /// monotonic growth on the base, and jitter that produces
3225 /// different outputs for different shard ids.
3226 #[test]
3227 fn retry_backoff_grows_with_attempt_and_jitters_per_shard() {
3228 // Shard 0 attempt 0..6: base ms 100, 200, 400, 800, 1600,
3229 // 3200, 3200 (cap). Plus ±25% jitter.
3230 let s0_a0 = retry_backoff(0, 0).as_millis();
3231 let s0_a1 = retry_backoff(0, 1).as_millis();
3232 let s0_a4 = retry_backoff(0, 4).as_millis();
3233 let s0_a5 = retry_backoff(0, 5).as_millis();
3234 let s0_a6 = retry_backoff(0, 6).as_millis();
3235
3236 // Bounds: each attempt's base is in `[base*0.75, base*1.25)`.
3237 assert!((75..=125).contains(&s0_a0));
3238 assert!((150..=250).contains(&s0_a1));
3239 assert!((1200..=2000).contains(&s0_a4));
3240 assert!((2400..=4000).contains(&s0_a5));
3241 // Cap at attempt=5: attempt 6 must NOT exceed attempt 5's
3242 // upper bound.
3243 assert!(
3244 s0_a6 <= 4000,
3245 "attempt > 5 must cap at the attempt-5 base; got {}ms",
3246 s0_a6
3247 );
3248
3249 // Jitter property: different shards at the same
3250 // attempt land on different backoffs. Sample 16 distinct
3251 // shard ids and assert at least 4 unique backoff values.
3252 //
3253 // The bound is deliberately loose (4 / 16) because
3254 // `DefaultHasher`'s exact distribution is **not stable**
3255 // across Rust toolchain versions — a tighter check (e.g.
3256 // ≥ 8) would empirically pass on every toolchain we test
3257 // against today, but a future stdlib change to the hasher
3258 // could shift the distribution and flake CI for a property
3259 // (decorrelation across shards) that doesn't actually
3260 // depend on a high collision-resistance bar. Asserting
3261 // ≥ 4 unique values out of 16 is enough to catch a real
3262 // regression (e.g. accidentally hashing only `attempt`
3263 // and not `shard_id` would collapse all 16 to a single
3264 // value) while staying robust to hasher-distribution
3265 // drift.
3266 use std::collections::HashSet;
3267 let s_attempt2: HashSet<u128> = (0u16..16)
3268 .map(|s| retry_backoff(s, 2).as_millis())
3269 .collect();
3270 assert!(
3271 s_attempt2.len() >= 4,
3272 "jitter must decorrelate retries across shards; \
3273 only {} unique backoffs across 16 shards",
3274 s_attempt2.len()
3275 );
3276 }
3277
3278 /// CR-23: pin that `EventBus::shutdown` actually invokes the
3279 /// adapter's `flush()` and `shutdown()` methods. The existing
3280 /// `sdk/tests/shutdown_regression.rs` covers the
3281 /// "shutdown runs even with outstanding Arc clones" property
3282 /// using a memory adapter whose `flush`/`shutdown` are no-ops
3283 /// — so a regression that elided the adapter calls would still
3284 /// pass. This test uses a recording adapter that increments
3285 /// per-method counters; we assert flush AND shutdown both fired
3286 /// exactly once after a clean `bus.shutdown().await`.
3287 ///
3288 /// The fix routes `Net::shutdown` through
3289 /// `shutdown_via_ref`, which in turn calls
3290 /// `self.adapter.flush()` and `self.adapter.shutdown()` once
3291 /// each. CR-23 pins this contract at the bus layer so an
3292 /// inadvertent regression at the SDK or adapter wrapper layer
3293 /// can be caught without an integration setup.
3294 #[tokio::test]
3295 async fn cr23_shutdown_invokes_adapter_flush_and_shutdown_exactly_once() {
3296 struct RecordingAdapter {
3297 on_batch_calls: Arc<AtomicU64>,
3298 flush_calls: Arc<AtomicU64>,
3299 shutdown_calls: Arc<AtomicU64>,
3300 }
3301
3302 #[async_trait::async_trait]
3303 impl crate::adapter::Adapter for RecordingAdapter {
3304 async fn init(&mut self) -> Result<(), AdapterError> {
3305 Ok(())
3306 }
3307 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3308 self.on_batch_calls.fetch_add(1, AtomicOrdering::SeqCst);
3309 Ok(())
3310 }
3311 async fn flush(&self) -> Result<(), AdapterError> {
3312 self.flush_calls.fetch_add(1, AtomicOrdering::SeqCst);
3313 Ok(())
3314 }
3315 async fn shutdown(&self) -> Result<(), AdapterError> {
3316 self.shutdown_calls.fetch_add(1, AtomicOrdering::SeqCst);
3317 Ok(())
3318 }
3319 async fn poll_shard(
3320 &self,
3321 _shard_id: u16,
3322 _from_id: Option<&str>,
3323 _limit: usize,
3324 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3325 Ok(crate::adapter::ShardPollResult::empty())
3326 }
3327 fn name(&self) -> &'static str {
3328 "cr23-recording"
3329 }
3330 async fn is_healthy(&self) -> bool {
3331 true
3332 }
3333 }
3334
3335 let on_batch = Arc::new(AtomicU64::new(0));
3336 let flush = Arc::new(AtomicU64::new(0));
3337 let shutdown = Arc::new(AtomicU64::new(0));
3338 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(RecordingAdapter {
3339 on_batch_calls: on_batch.clone(),
3340 flush_calls: flush.clone(),
3341 shutdown_calls: shutdown.clone(),
3342 });
3343
3344 let config = EventBusConfig::builder()
3345 .num_shards(2)
3346 .ring_buffer_capacity(1024)
3347 .build()
3348 .unwrap();
3349 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3350
3351 // Drive a small burst so on_batch fires at least once —
3352 // pins that the adapter is wired up correctly. The
3353 // load-bearing assertions below are on flush and shutdown.
3354 for i in 0..16 {
3355 let _ = bus.ingest(Event::new(json!({"i": i})));
3356 }
3357
3358 // Pre-CR-23 a regression that elided one of these would
3359 // pass `shutdown_regression.rs::shutdown_runs_even_with_outstanding_event_stream`
3360 // because the memory adapter's flush/shutdown are no-ops.
3361 // Here the recording adapter makes the contract observable.
3362 bus.shutdown().await.unwrap();
3363
3364 assert!(
3365 on_batch.load(AtomicOrdering::SeqCst) > 0,
3366 "sanity: on_batch must have fired at least once"
3367 );
3368 assert_eq!(
3369 flush.load(AtomicOrdering::SeqCst),
3370 1,
3371 "CR-23 regression: shutdown MUST call adapter.flush() exactly once"
3372 );
3373 assert_eq!(
3374 shutdown.load(AtomicOrdering::SeqCst),
3375 1,
3376 "CR-23 regression: shutdown MUST call adapter.shutdown() exactly once"
3377 );
3378 }
3379
3380 /// CR-25: pin that a SECOND caller of `shutdown_via_ref` whose
3381 /// CAS loses (because a first caller already flipped the
3382 /// `shutdown` flag) and whose deadline elapses BEFORE the
3383 /// first caller sets `shutdown_completed=true` returns
3384 /// `AdapterError::Transient(_)` — NOT a silent `Ok(())`.
3385 ///
3386 /// Pre-CR-25 both branches returned `Ok`. A caller that lost
3387 /// the CAS race had no way to distinguish "first caller
3388 /// finished shutdown" from "deadline timed out mid-shutdown."
3389 /// Under a slow adapter (`adapter_timeout` default 30s >
3390 /// the 10s spin deadline), the second caller silently saw
3391 /// `Ok` while the bus was still mid-shutdown — letting
3392 /// subsequent code observe a partially-shut-down bus.
3393 ///
3394 /// We use a slow first caller (sleeps inside `flush()`)
3395 /// and override the spin deadline to a few ms so the test
3396 /// runs fast.
3397 #[tokio::test]
3398 async fn cr25_second_caller_returns_transient_when_deadline_elapses() {
3399 struct SlowFlushAdapter {
3400 // Block flush() for this long. The first caller
3401 // gets stuck here while the second caller's spin
3402 // deadline elapses.
3403 flush_delay: std::time::Duration,
3404 }
3405
3406 #[async_trait::async_trait]
3407 impl crate::adapter::Adapter for SlowFlushAdapter {
3408 async fn init(&mut self) -> Result<(), AdapterError> {
3409 Ok(())
3410 }
3411 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3412 Ok(())
3413 }
3414 async fn flush(&self) -> Result<(), AdapterError> {
3415 tokio::time::sleep(self.flush_delay).await;
3416 Ok(())
3417 }
3418 async fn shutdown(&self) -> Result<(), AdapterError> {
3419 Ok(())
3420 }
3421 async fn poll_shard(
3422 &self,
3423 _shard_id: u16,
3424 _from_id: Option<&str>,
3425 _limit: usize,
3426 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3427 Ok(crate::adapter::ShardPollResult::empty())
3428 }
3429 fn name(&self) -> &'static str {
3430 "cr25-slow-flush"
3431 }
3432 async fn is_healthy(&self) -> bool {
3433 true
3434 }
3435 }
3436
3437 // Cubic P2: serialize access to the global deadline
3438 // override so concurrent tests don't interfere. Hold the
3439 // guard until the override is reset to 0 below.
3440 let _override_guard = super::shutdown_deadline_override_lock().await;
3441
3442 // Override the second-caller spin deadline to a short
3443 // value so the test doesn't wall-clock-wait 10s. Production
3444 // builds use the 10s default (the cfg(test) override is
3445 // compiled out).
3446 super::set_shutdown_via_ref_spin_deadline_for_test(50);
3447
3448 let config = EventBusConfig::builder()
3449 .num_shards(2)
3450 .ring_buffer_capacity(1024)
3451 .adapter_timeout(std::time::Duration::from_secs(5))
3452 .build()
3453 .unwrap();
3454 // First caller's flush() sleeps 500ms — far longer than
3455 // the 50ms spin deadline.
3456 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(SlowFlushAdapter {
3457 flush_delay: std::time::Duration::from_millis(500),
3458 });
3459 let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
3460
3461 // Spawn the FIRST caller — it wins the CAS and proceeds
3462 // into the slow flush. We don't await it; we want it
3463 // running in parallel.
3464 let bus_first = Arc::clone(&bus);
3465 let first_caller = tokio::spawn(async move { bus_first.shutdown_via_ref().await });
3466
3467 // Cubic P2: poll `is_shutdown()` until the first caller
3468 // has set the flag, instead of a fixed sleep. This makes
3469 // the test scheduler-independent — we proceed as soon as
3470 // the first caller has won the CAS, regardless of how
3471 // tokio happens to schedule things. Bounded by a 1s
3472 // timeout so a regression that prevents `shutdown_via_ref`
3473 // from setting the flag doesn't hang the test.
3474 tokio::time::timeout(std::time::Duration::from_secs(1), async {
3475 while !bus.is_shutdown() {
3476 tokio::task::yield_now().await;
3477 }
3478 })
3479 .await
3480 .expect("first caller did not set shutdown flag within 1s");
3481
3482 // The second caller's CAS will fail; it enters the spin
3483 // and times out at 50ms.
3484 let start = std::time::Instant::now();
3485 let second_result = bus.shutdown_via_ref().await;
3486 let elapsed = start.elapsed();
3487
3488 // Reset the override for other tests, then drop the guard.
3489 super::set_shutdown_via_ref_spin_deadline_for_test(0);
3490
3491 // CR-25 contract: the second caller MUST get a Transient
3492 // error, not a silent Ok.
3493 let err = second_result.expect_err(
3494 "CR-25 regression: second caller MUST surface AdapterError::Transient \
3495 when its deadline elapses, NOT a silent Ok",
3496 );
3497 match err {
3498 AdapterError::Transient(msg) => {
3499 assert!(
3500 msg.contains("deadline elapsed") || msg.contains("mid-shutdown"),
3501 "error message must reference the deadline path; got: {msg}"
3502 );
3503 }
3504 other => panic!("expected Transient, got {:?}", other),
3505 }
3506
3507 // Sanity: the second caller's elapsed time is bounded by
3508 // the override (50ms) + scheduler slop. If this is
3509 // anywhere near the production 10s, the cfg(test)
3510 // override path broke.
3511 assert!(
3512 elapsed < std::time::Duration::from_secs(5),
3513 "second caller took {elapsed:?}; the cfg(test) deadline override \
3514 broke if this is near the 10s production default"
3515 );
3516
3517 // Wait for the first caller to finish. Even though it
3518 // took the slow path, the bus IS shutting down and will
3519 // eventually complete.
3520 let _ = first_caller.await.unwrap();
3521 assert!(bus.is_shutdown_completed());
3522 }
3523
3524 /// Regression: BUG_REPORT.md #6 — `shutdown()` must deliver every
3525 /// successfully-ingested event to the adapter before returning.
3526 /// Pins the broader durability contract that the
3527 /// `drain_finalize_ready` gate supports: the drain worker may not
3528 /// finalize until the in-flight wait completes.
3529 ///
3530 /// Tests across many shards with bursts large enough that the
3531 /// drain workers are mid-loop when shutdown begins.
3532 #[tokio::test]
3533 async fn shutdown_delivers_every_successful_ingest_to_adapter() {
3534 let received = Arc::new(AtomicU64::new(0));
3535 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3536 received: received.clone(),
3537 });
3538
3539 let config = EventBusConfig::builder()
3540 .num_shards(4)
3541 .ring_buffer_capacity(4096)
3542 .build()
3543 .unwrap();
3544 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3545
3546 // Drive a sizable burst across all shards. Capacity > burst so
3547 // we don't trip backpressure; every successful Ok must reach
3548 // `on_batch` before shutdown returns.
3549 let total = 10_000usize;
3550 let mut successes = 0u64;
3551 for i in 0..total {
3552 if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3553 successes += 1;
3554 }
3555 }
3556
3557 // Shutdown awaits drain workers; with the BUG_REPORT.md #6 fix
3558 // those workers wait on `drain_finalize_ready` after observing
3559 // `shutdown=true`, so any push the producer made before the
3560 // shutdown flag is guaranteed to be in the ring buffer when
3561 // the final sweep runs.
3562 bus.shutdown().await.unwrap();
3563
3564 let delivered = received.load(AtomicOrdering::SeqCst);
3565 assert_eq!(
3566 delivered, successes,
3567 "shutdown stranded events: {successes} ingested successfully, \
3568 only {delivered} reached the adapter"
3569 );
3570 }
3571
3572 /// Regression: BUG_REPORT.md #16 — `flush()` must be a delivery
3573 /// barrier: after it returns successfully, every event the
3574 /// caller successfully ingested before `flush()` was called
3575 /// must have been handed to the adapter via `on_batch`.
3576 /// The previous implementation slept a single `batch.max_delay`
3577 /// after the ring buffers drained, which left a window where
3578 /// events could still be sitting in the per-shard mpsc channel
3579 /// or inside a partially-filled batch awaiting timeout — those
3580 /// events were silently dropped from the flush guarantee.
3581 #[tokio::test]
3582 async fn flush_is_a_delivery_barrier() {
3583 let received = Arc::new(AtomicU64::new(0));
3584 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3585 received: received.clone(),
3586 });
3587
3588 // Use a deliberately *long* batch.max_delay (250ms) so that a
3589 // partially-filled batch sitting in the batch worker's
3590 // pending state would survive past the old single-`max_delay`
3591 // sleep. min_size > burst forces the partial-batch path.
3592 let config = EventBusConfig::builder()
3593 .num_shards(2)
3594 .ring_buffer_capacity(1024)
3595 .batch(crate::config::BatchConfig {
3596 min_size: 1_000,
3597 max_size: 10_000,
3598 max_delay: Duration::from_millis(250),
3599 adaptive: false,
3600 velocity_window: Duration::from_millis(100),
3601 })
3602 .build()
3603 .unwrap();
3604 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3605
3606 // A small burst — far below `min_size`, so the batch worker
3607 // will sit on a partial batch waiting for `max_delay`.
3608 let burst = 50usize;
3609 let mut successes = 0u64;
3610 for i in 0..burst {
3611 if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3612 successes += 1;
3613 }
3614 }
3615
3616 // Time the flush call to confirm we waited long enough for
3617 // the partial batch to time out. The previous code slept
3618 // ~10ms total in the post-empty phase; the fix waits up to
3619 // `max_delay * num_workers` (here 500ms cap, capped at 2s).
3620 let t0 = std::time::Instant::now();
3621 bus.flush().await.unwrap();
3622 let elapsed = t0.elapsed();
3623
3624 // After flush returns, every successful ingest must have
3625 // been delivered to the adapter. With the old code this
3626 // assertion would fail: events sit in the partial batch
3627 // until `max_delay` (250ms) elapses, but flush returned
3628 // after only ~10ms.
3629 let delivered = received.load(AtomicOrdering::SeqCst);
3630 assert_eq!(
3631 delivered, successes,
3632 "flush() returned but only {delivered} of {successes} \
3633 events reached the adapter (#16); flush waited {:?}",
3634 elapsed
3635 );
3636
3637 bus.shutdown().await.unwrap();
3638 }
3639
3640 /// Regression (Phase 1): when configured with a
3641 /// persistent `producer_nonce_path`, two bus instances launched
3642 /// against the same path stamp the SAME nonce on every emitted
3643 /// batch. JetStream / Redis adapters key dedup on this nonce, so
3644 /// a producer that crashed mid-batch and restarted (within the
3645 /// backend's dedup window) issues retries with the same msg-ids
3646 /// and the backend correctly recognizes them as duplicates.
3647 ///
3648 /// Pre-fix the per-process nonce regenerated on every startup,
3649 /// so post-crash retries wrote NEW msg-ids and the backend
3650 /// persisted the partial-batch's accepted half twice.
3651 #[tokio::test]
3652 async fn persistent_producer_nonce_survives_bus_restart() {
3653 // Use a per-test temp file so concurrent runs don't collide.
3654 let mut nonce_path = std::env::temp_dir();
3655 let pid = std::process::id();
3656 let nanos = std::time::SystemTime::now()
3657 .duration_since(std::time::UNIX_EPOCH)
3658 .map(|d| d.as_nanos())
3659 .unwrap_or(0);
3660 nonce_path.push(format!("net-test-bus-nonce-{pid}-{nanos}"));
3661
3662 let make_config = |path: &std::path::Path| {
3663 EventBusConfig::builder()
3664 .num_shards(1)
3665 .ring_buffer_capacity(1024)
3666 .producer_nonce_path(path)
3667 .build()
3668 .unwrap()
3669 };
3670
3671 // First bus: ingest one event. Read its nonce off the
3672 // adapter-bound batch via a recording adapter.
3673 struct NonceRecordingAdapter {
3674 nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3675 }
3676 #[async_trait::async_trait]
3677 impl crate::adapter::Adapter for NonceRecordingAdapter {
3678 async fn init(&mut self) -> Result<(), AdapterError> {
3679 Ok(())
3680 }
3681 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3682 *self.nonce.lock() = Some(batch.process_nonce);
3683 Ok(())
3684 }
3685 async fn flush(&self) -> Result<(), AdapterError> {
3686 Ok(())
3687 }
3688 async fn shutdown(&self) -> Result<(), AdapterError> {
3689 Ok(())
3690 }
3691 async fn poll_shard(
3692 &self,
3693 _: u16,
3694 _: Option<&str>,
3695 _: usize,
3696 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3697 Ok(crate::adapter::ShardPollResult::empty())
3698 }
3699 fn name(&self) -> &'static str {
3700 "nonce-recording"
3701 }
3702 }
3703
3704 let nonce_first_run = Arc::new(parking_lot::Mutex::new(None));
3705 {
3706 let bus = EventBus::new_with_adapter(
3707 make_config(&nonce_path),
3708 Box::new(NonceRecordingAdapter {
3709 nonce: nonce_first_run.clone(),
3710 }),
3711 )
3712 .await
3713 .unwrap();
3714 bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3715 bus.flush().await.unwrap();
3716 bus.shutdown().await.unwrap();
3717 }
3718
3719 let nonce_second_run = Arc::new(parking_lot::Mutex::new(None));
3720 {
3721 let bus = EventBus::new_with_adapter(
3722 make_config(&nonce_path),
3723 Box::new(NonceRecordingAdapter {
3724 nonce: nonce_second_run.clone(),
3725 }),
3726 )
3727 .await
3728 .unwrap();
3729 bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3730 bus.flush().await.unwrap();
3731 bus.shutdown().await.unwrap();
3732 }
3733
3734 let n_a = nonce_first_run
3735 .lock()
3736 .expect("first bus must have dispatched a batch");
3737 let n_b = nonce_second_run
3738 .lock()
3739 .expect("second bus must have dispatched a batch");
3740 assert_eq!(
3741 n_a, n_b,
3742 "two bus instances against the same producer_nonce_path \
3743 must stamp the same nonce — pre-fix this regenerated on \
3744 every restart and JetStream's dedup window saw new \
3745 msg-ids as fresh batches",
3746 );
3747
3748 // Cleanup.
3749 let _ = std::fs::remove_file(&nonce_path);
3750 }
3751
3752 /// Pin that ALL spawn sites — both the static initial-shard
3753 /// loop in `new_with_adapter` and the dynamic-add path in
3754 /// `add_shard_internal` — clone the bus's loaded
3755 /// `producer_nonce` correctly. Pre-#56 there was no nonce
3756 /// concept at the bus layer; if any future refactor drops the
3757 /// `producer_nonce: self.producer_nonce` line from one of the
3758 /// spawn sites (or stops loading the persistent path), the
3759 /// post-scale-up shard's batches would carry a different nonce
3760 /// and JetStream's cross-restart dedup would silently break for
3761 /// events ingested into the dynamic shard. Pin all observed
3762 /// batches across the static + dynamic shards share the bus's
3763 /// nonce.
3764 #[tokio::test]
3765 async fn multi_shard_bus_stamps_consistent_nonce_across_static_and_dynamic_shards() {
3766 let mut nonce_path = std::env::temp_dir();
3767 let pid = std::process::id();
3768 let nanos = std::time::SystemTime::now()
3769 .duration_since(std::time::UNIX_EPOCH)
3770 .map(|d| d.as_nanos())
3771 .unwrap_or(0);
3772 nonce_path.push(format!("net-test-multi-shard-nonce-{pid}-{nanos}"));
3773
3774 struct CollectingAdapter {
3775 nonces: Arc<parking_lot::Mutex<Vec<u64>>>,
3776 }
3777 #[async_trait::async_trait]
3778 impl crate::adapter::Adapter for CollectingAdapter {
3779 async fn init(&mut self) -> Result<(), AdapterError> {
3780 Ok(())
3781 }
3782 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3783 self.nonces.lock().push(batch.process_nonce);
3784 Ok(())
3785 }
3786 async fn flush(&self) -> Result<(), AdapterError> {
3787 Ok(())
3788 }
3789 async fn shutdown(&self) -> Result<(), AdapterError> {
3790 Ok(())
3791 }
3792 async fn poll_shard(
3793 &self,
3794 _: u16,
3795 _: Option<&str>,
3796 _: usize,
3797 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3798 Ok(crate::adapter::ShardPollResult::empty())
3799 }
3800 fn name(&self) -> &'static str {
3801 "collecting"
3802 }
3803 }
3804
3805 let nonces = Arc::new(parking_lot::Mutex::new(Vec::new()));
3806 let policy = ScalingPolicy {
3807 min_shards: 1,
3808 max_shards: 8,
3809 cooldown: Duration::from_nanos(1),
3810 ..Default::default()
3811 };
3812 let config = EventBusConfig::builder()
3813 .num_shards(2)
3814 .ring_buffer_capacity(1024)
3815 .scaling(policy)
3816 .producer_nonce_path(&nonce_path)
3817 .build()
3818 .unwrap();
3819
3820 let bus = EventBus::new_with_adapter(
3821 config,
3822 Box::new(CollectingAdapter {
3823 nonces: nonces.clone(),
3824 }),
3825 )
3826 .await
3827 .unwrap();
3828
3829 // Drive the two static shards.
3830 for i in 0..200u64 {
3831 let _ = bus.ingest(Event::new(json!({"i": i})));
3832 }
3833 bus.flush().await.unwrap();
3834
3835 // Add a dynamic shard and drive it too.
3836 let _ = bus.manual_scale_up(1).await.unwrap();
3837 for i in 200..400u64 {
3838 let _ = bus.ingest(Event::new(json!({"i": i})));
3839 }
3840 bus.flush().await.unwrap();
3841
3842 bus.shutdown().await.unwrap();
3843
3844 let observed = nonces.lock().clone();
3845 assert!(
3846 !observed.is_empty(),
3847 "expected the adapter to have observed at least one batch",
3848 );
3849 let first = observed[0];
3850 for (i, &n) in observed.iter().enumerate() {
3851 assert_eq!(
3852 n, first,
3853 "batch {i} stamped a different nonce ({n:#x}) than the first \
3854 batch ({first:#x}) — at least one spawn site (initial-shard \
3855 loop or `add_shard_internal`) failed to inherit the bus's \
3856 producer_nonce",
3857 );
3858 }
3859
3860 let _ = std::fs::remove_file(&nonce_path);
3861 }
3862
3863 /// Regression guard for the `(process_nonce, shard_id,
3864 /// sequence_start, i)` JetStream / Redis msg-id construction.
3865 /// Within one bus instance, `sequence_start` per shard MUST be
3866 /// strictly monotonic across batches AND every two adjacent
3867 /// batches `n` and `n+1` from the same shard MUST satisfy
3868 /// `seq_start[n+1] == seq_start[n] + len(events[n])` — no gaps,
3869 /// no overlap. A regression here breaks the at-most-once dedup
3870 /// every persistent adapter relies on: gaps reuse a `(shard,
3871 /// seq, i)` tuple after the dedup window closes, and overlap
3872 /// silently overlays a later batch on an earlier one's slot.
3873 ///
3874 /// The cross-restart variant (persistent `next_sequence` across
3875 /// process boots) is feature-shaped, not a bug fix — without
3876 /// persistence, restart relies on `process_nonce` rotating to
3877 /// disjoin the msg-id namespace (pinned by
3878 /// `persistent_producer_nonce_survives_bus_restart`). This test
3879 /// pins the within-process invariant the persistent nonce
3880 /// builds on.
3881 #[tokio::test]
3882 async fn sequence_start_is_per_shard_monotonic_and_gap_free() {
3883 struct ShardSeqRecorder {
3884 batches: Arc<parking_lot::Mutex<Vec<(u16, u64, usize)>>>,
3885 }
3886 #[async_trait::async_trait]
3887 impl crate::adapter::Adapter for ShardSeqRecorder {
3888 async fn init(&mut self) -> Result<(), AdapterError> {
3889 Ok(())
3890 }
3891 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3892 self.batches.lock().push((
3893 batch.shard_id,
3894 batch.sequence_start,
3895 batch.events.len(),
3896 ));
3897 Ok(())
3898 }
3899 async fn flush(&self) -> Result<(), AdapterError> {
3900 Ok(())
3901 }
3902 async fn shutdown(&self) -> Result<(), AdapterError> {
3903 Ok(())
3904 }
3905 async fn poll_shard(
3906 &self,
3907 _: u16,
3908 _: Option<&str>,
3909 _: usize,
3910 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3911 Ok(crate::adapter::ShardPollResult::empty())
3912 }
3913 fn name(&self) -> &'static str {
3914 "shard-seq-recorder"
3915 }
3916 }
3917
3918 let batches = Arc::new(parking_lot::Mutex::new(Vec::new()));
3919 let config = EventBusConfig::builder()
3920 .num_shards(4)
3921 .ring_buffer_capacity(1024)
3922 .build()
3923 .unwrap();
3924 let bus = EventBus::new_with_adapter(
3925 config,
3926 Box::new(ShardSeqRecorder {
3927 batches: batches.clone(),
3928 }),
3929 )
3930 .await
3931 .unwrap();
3932
3933 // Three drive-then-flush rounds so each shard sees multiple
3934 // batches; the across-batch monotonicity invariant is what
3935 // we're pinning, so a single batch per shard wouldn't
3936 // exercise it.
3937 for round in 0..3u64 {
3938 for i in 0..200u64 {
3939 bus.ingest(Event::new(json!({"r": round, "i": i}))).unwrap();
3940 }
3941 bus.flush().await.unwrap();
3942 }
3943 bus.shutdown().await.unwrap();
3944
3945 let observed = batches.lock().clone();
3946 assert!(
3947 !observed.is_empty(),
3948 "expected the adapter to have observed at least one batch",
3949 );
3950
3951 // Bucket batches per shard, preserving dispatch order.
3952 let mut by_shard: std::collections::HashMap<u16, Vec<(u64, usize)>> =
3953 std::collections::HashMap::new();
3954 for (shard, seq_start, len) in observed {
3955 by_shard.entry(shard).or_default().push((seq_start, len));
3956 }
3957
3958 for (shard, runs) in &by_shard {
3959 assert!(
3960 !runs.is_empty(),
3961 "shard {shard}: must have at least one batch",
3962 );
3963 // First batch of every shard starts at the per-shard
3964 // zero — `BatchWorker::next_sequence` is initialized to
3965 // 0 and the first `flush()` reads it before the
3966 // `saturating_add`. A regression that lazily seeds
3967 // `next_sequence` from a non-zero source (e.g. wall
3968 // clock) would trip here.
3969 assert_eq!(
3970 runs[0].0, 0,
3971 "shard {shard}: first batch must start at sequence 0, got {}",
3972 runs[0].0,
3973 );
3974 for window in runs.windows(2) {
3975 let (prev_start, prev_len) = window[0];
3976 let (next_start, _next_len) = window[1];
3977 let expected_next = prev_start
3978 .checked_add(prev_len as u64)
3979 .expect("test bounds keep us well below u64::MAX");
3980 assert!(
3981 next_start > prev_start,
3982 "shard {shard}: sequence_start must be strictly monotonic; \
3983 saw {prev_start} → {next_start}",
3984 );
3985 assert_eq!(
3986 next_start, expected_next,
3987 "shard {shard}: gap or overlap in sequence_start; \
3988 prev=({prev_start}, len={prev_len}) → next={next_start}, \
3989 expected {expected_next}. A gap reuses (shard, seq, i) \
3990 tuples after the JetStream/Redis dedup window closes; an \
3991 overlap silently overlays a later batch on an earlier \
3992 one's slot.",
3993 );
3994 }
3995 }
3996 }
3997
3998 /// Pin the within-process caching contract for the fallback
3999 /// (no-`producer_nonce_path`) path: two bus instances created
4000 /// in the same process see the SAME `batch_process_nonce()`
4001 /// because the helper is `OnceLock`-cached. The
4002 /// "different-across-restarts" semantic is a *process-level*
4003 /// guarantee — restart the process to get a fresh nonce — and
4004 /// is pinned by `persistent_producer_nonce_survives_bus_restart`
4005 /// (which uses a path; the without-path branch of #56 has no
4006 /// cross-restart guarantee by design).
4007 ///
4008 /// Cubic-ai P3: this test was previously named
4009 /// `process_nonce_fallback_differs_across_bus_instances`, which
4010 /// contradicted its own assertion (`assert_eq!(n_a, n_b)`).
4011 /// Renamed to match what it actually pins.
4012 #[tokio::test]
4013 async fn process_nonce_fallback_is_cached_within_process() {
4014 struct NonceRecordingAdapter {
4015 nonce: Arc<parking_lot::Mutex<Option<u64>>>,
4016 }
4017 #[async_trait::async_trait]
4018 impl crate::adapter::Adapter for NonceRecordingAdapter {
4019 async fn init(&mut self) -> Result<(), AdapterError> {
4020 Ok(())
4021 }
4022 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
4023 *self.nonce.lock() = Some(batch.process_nonce);
4024 Ok(())
4025 }
4026 async fn flush(&self) -> Result<(), AdapterError> {
4027 Ok(())
4028 }
4029 async fn shutdown(&self) -> Result<(), AdapterError> {
4030 Ok(())
4031 }
4032 async fn poll_shard(
4033 &self,
4034 _: u16,
4035 _: Option<&str>,
4036 _: usize,
4037 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
4038 Ok(crate::adapter::ShardPollResult::empty())
4039 }
4040 fn name(&self) -> &'static str {
4041 "nonce-recording"
4042 }
4043 }
4044
4045 let cfg = || {
4046 EventBusConfig::builder()
4047 .num_shards(1)
4048 .ring_buffer_capacity(1024)
4049 .build()
4050 .unwrap()
4051 };
4052
4053 let n_a = Arc::new(parking_lot::Mutex::new(None));
4054 let n_b = Arc::new(parking_lot::Mutex::new(None));
4055 {
4056 let bus = EventBus::new_with_adapter(
4057 cfg(),
4058 Box::new(NonceRecordingAdapter { nonce: n_a.clone() }),
4059 )
4060 .await
4061 .unwrap();
4062 bus.ingest(Event::new(json!({"i": 1}))).unwrap();
4063 bus.flush().await.unwrap();
4064 bus.shutdown().await.unwrap();
4065 }
4066 {
4067 let bus = EventBus::new_with_adapter(
4068 cfg(),
4069 Box::new(NonceRecordingAdapter { nonce: n_b.clone() }),
4070 )
4071 .await
4072 .unwrap();
4073 bus.ingest(Event::new(json!({"i": 2}))).unwrap();
4074 bus.flush().await.unwrap();
4075 bus.shutdown().await.unwrap();
4076 }
4077
4078 // Note: in a single-process test BOTH bus instances see the
4079 // same `OnceLock`-cached `batch_process_nonce`, so the
4080 // nonces ARE equal here even though the documented
4081 // semantic is "fresh per process." This test pins the
4082 // cached-within-a-process invariant; the across-PROCESSES
4083 // semantic is exercised by the persistent-nonce test
4084 // above (which is the actually-load-bearing path for the
4085 // persistent-nonce fix).
4086 let n_a = n_a.lock().unwrap();
4087 let n_b = n_b.lock().unwrap();
4088 assert_eq!(
4089 n_a, n_b,
4090 "within one process, batch_process_nonce is OnceLock-cached \
4091 so two bus instances see the same nonce — the \
4092 different-across-restarts contract is process-level, \
4093 pinned via `persistent_producer_nonce_survives_bus_restart`",
4094 );
4095 }
4096
4097 /// Regression: `EventBusStats::batches_dispatched`
4098 /// (and the new `events_dispatched`) must actually increment on
4099 /// every successful adapter dispatch. Pre-fix `batches_dispatched`
4100 /// was declared but never updated, so flush()'s Phase 2 progress
4101 /// gate was constant-zero and early-broke after one window —
4102 /// flake on Windows-class timer resolution. Pin both counters
4103 /// directly here so a future refactor that drops the increment
4104 /// fails this test, not the timing-dependent
4105 /// `flush_is_a_delivery_barrier`.
4106 #[tokio::test]
4107 async fn dispatch_increments_bus_level_event_and_batch_counters() {
4108 let received = Arc::new(AtomicU64::new(0));
4109 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
4110 received: received.clone(),
4111 });
4112
4113 let config = EventBusConfig::builder()
4114 .num_shards(2)
4115 .ring_buffer_capacity(1024)
4116 .batch(crate::config::BatchConfig {
4117 min_size: 1,
4118 max_size: 10,
4119 max_delay: Duration::from_millis(10),
4120 adaptive: false,
4121 velocity_window: Duration::from_millis(100),
4122 })
4123 .build()
4124 .unwrap();
4125 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
4126
4127 for i in 0..50 {
4128 bus.ingest(Event::new(json!({"i": i}))).unwrap();
4129 }
4130 bus.flush().await.unwrap();
4131
4132 let batches = bus.stats().batches_dispatched.load(AtomicOrdering::Acquire);
4133 let events = bus.stats().events_dispatched.load(AtomicOrdering::Acquire);
4134 assert!(
4135 batches > 0,
4136 "batches_dispatched must be > 0 after flush — pre-fix it was \
4137 never incremented anywhere, breaking flush()'s Phase 2 progress gate",
4138 );
4139 assert_eq!(
4140 events, 50,
4141 "events_dispatched must equal the number of events handed to \
4142 the adapter (got {events}, expected 50)",
4143 );
4144
4145 bus.shutdown().await.unwrap();
4146 }
4147
4148 /// Regression: BUG_REPORT.md #6 — drop-without-shutdown must
4149 /// still release the drain-finalize gate so detached drain
4150 /// workers can exit instead of parking on the gate until the
4151 /// internal `DRAIN_FINALIZE_TIMEOUT` deadline. Pinning this
4152 /// keeps the `Drop` impl honest if someone refactors the
4153 /// shutdown gates later.
4154 #[tokio::test]
4155 async fn drop_releases_drain_finalize_gate_promptly() {
4156 let config = EventBusConfig::builder()
4157 .num_shards(2)
4158 .ring_buffer_capacity(1024)
4159 .build()
4160 .unwrap();
4161 let bus = EventBus::new(config).await.unwrap();
4162 let drain_gate = bus.drain_finalize_ready.clone();
4163
4164 // Drop without an awaited shutdown.
4165 drop(bus);
4166
4167 // The Drop impl must have set the gate. `DRAIN_FINALIZE_TIMEOUT`
4168 // is 10s; if Drop didn't flip the gate, drain workers would
4169 // park for up to that long before exiting.
4170 assert!(
4171 drain_gate.load(AtomicOrdering::Acquire),
4172 "Drop must release `drain_finalize_ready` so detached drain \
4173 workers exit promptly"
4174 );
4175 }
4176
4177 /// The lossy-shutdown reconciliation at L1615-L1635 has TWO
4178 /// arms after the deadline-snapshot fires:
4179 ///
4180 /// - `actual_drops > 0` → bump `events_dropped`
4181 /// (already exercised by the deadline path running with
4182 /// real producers that never recover)
4183 /// - `actual_drops == 0` → clear the eager
4184 /// `shutdown_was_lossy = true` because the drain caught
4185 /// up to every stranded ingest
4186 ///
4187 /// The second arm is the false-positive-alert fix: pre-fix,
4188 /// the `was_lossy` flag stayed `true` against
4189 /// `events_dropped == 0` for any deadline-triggered shutdown
4190 /// whose final drain happened to catch up — operator
4191 /// dashboards alerting on `was_lossy && dropped == 0` saw
4192 /// false positives.
4193 ///
4194 /// We pin the clear-flag arm by:
4195 /// 1. Manually staging `in_flight_ingests = 1` so the
4196 /// shutdown spin loop sees pending work.
4197 /// 2. Using `tokio::time::advance(6s)` to force the
4198 /// hardcoded 5-second deadline (the comments at L1399
4199 /// and L1725 confirm the deadline uses
4200 /// `tokio::time::Instant` precisely so paused-time
4201 /// tests can virtualize it).
4202 /// 3. Simulating the stranded producer completing AFTER
4203 /// the deadline snapshot: bump `events_ingested += 1`
4204 /// and drop the in-flight counter back to 0. The
4205 /// post-drain reconciliation reads
4206 /// `post_deadline_ingests = 1 - 0 = 1 >= stranded = 1`
4207 /// so `actual_drops == 0` and the arm clears
4208 /// `shutdown_was_lossy`.
4209 #[tokio::test(start_paused = true)]
4210 async fn lossy_shutdown_reconciliation_clears_was_lossy_when_drain_catches_up() {
4211 let config = EventBusConfig::builder()
4212 .num_shards(1)
4213 .ring_buffer_capacity(1024)
4214 .without_scaling()
4215 .build()
4216 .unwrap();
4217 let bus = Arc::new(
4218 EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4219 .await
4220 .unwrap(),
4221 );
4222
4223 // Stage one "stranded" in-flight ingest. Real producers
4224 // would have incremented this via try_enter_ingest and
4225 // be mid-call to `shard_manager.ingest`; the test helper
4226 // (defined alongside `EventBus`) holds the counter at 1
4227 // for the duration of the deadline-window.
4228 bus.stage_stranded_ingest(1);
4229
4230 // Spawn shutdown — it'll enter the in-flight spin loop
4231 // and park on `tokio::time::sleep(1ms)` until time
4232 // advances.
4233 let bus_for_shutdown = Arc::clone(&bus);
4234 let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4235
4236 // Yield enough times for the shutdown task to enter the
4237 // spin loop. With paused time, the spin's 1ms sleep
4238 // blocks until we advance.
4239 for _ in 0..10 {
4240 tokio::task::yield_now().await;
4241 }
4242
4243 // Advance past the 5s deadline. The next loop iteration
4244 // observes the deadline, captures `(stranded=1,
4245 // ingested=0, dispatched=0)`, sets `was_lossy = true`,
4246 // and breaks.
4247 tokio::time::advance(std::time::Duration::from_secs(6)).await;
4248 for _ in 0..10 {
4249 tokio::task::yield_now().await;
4250 }
4251
4252 // The eager-set was_lossy flag is visible now.
4253 assert!(
4254 bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4255 "deadline must have set was_lossy=true (eagerly)",
4256 );
4257
4258 // Now simulate the "stranded" producer completing AFTER
4259 // the deadline. The helper bumps events_ingested with
4260 // Release ordering before releasing the in-flight slot
4261 // — matching what a real `ingest` call site does, so
4262 // the reconciliation reads the same arithmetic.
4263 bus.complete_stranded_ingest(1);
4264
4265 // Let shutdown finish — its post-drain reconciliation
4266 // now reads ingested_after=1, post_deadline_ingests=1,
4267 // actual_drops=stranded(1)-1=0 → clears was_lossy.
4268 let _ = shutdown_task.await.unwrap();
4269
4270 assert!(
4271 !bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4272 "reconciliation must clear was_lossy when drain catches up to every stranded ingest \
4273 (post_deadline_ingests >= stranded → actual_drops == 0)",
4274 );
4275 // And events_dropped must NOT have been bumped — the
4276 // whole point of the reconciliation arm is that no event
4277 // was actually lost.
4278 assert_eq!(
4279 bus.stats.events_dropped.load(AtomicOrdering::Acquire),
4280 0,
4281 "no events were actually dropped — reconciliation must not bump events_dropped",
4282 );
4283 }
4284
4285 /// Companion to `lossy_shutdown_reconciliation_clears_was_lossy_*`:
4286 /// pins the OTHER arm of the same if/else at L1615-L1619.
4287 /// When drain does NOT catch up (post_deadline_ingests <
4288 /// stranded), `actual_drops > 0` and `events_dropped` is
4289 /// bumped. The `was_lossy = true` flag set eagerly before the
4290 /// reconciliation stays true here — pre-fix the same code
4291 /// path bumped events_dropped += stranded EAGERLY *and*
4292 /// counted the same events as ingested, breaking the
4293 /// `ingested == dispatched + dropped` invariant.
4294 #[tokio::test(start_paused = true)]
4295 async fn lossy_shutdown_reconciliation_bumps_events_dropped_when_drain_does_not_catch_up() {
4296 let config = EventBusConfig::builder()
4297 .num_shards(1)
4298 .ring_buffer_capacity(1024)
4299 .without_scaling()
4300 .build()
4301 .unwrap();
4302 let bus = Arc::new(
4303 EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4304 .await
4305 .unwrap(),
4306 );
4307
4308 // Stage two stranded ingests so the actual_drops count
4309 // is meaningful (not just 0 vs 1).
4310 bus.stage_stranded_ingest(2);
4311
4312 let bus_for_shutdown = Arc::clone(&bus);
4313 let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4314
4315 for _ in 0..10 {
4316 tokio::task::yield_now().await;
4317 }
4318 tokio::time::advance(std::time::Duration::from_secs(6)).await;
4319 for _ in 0..10 {
4320 tokio::task::yield_now().await;
4321 }
4322
4323 // Drop the in-flight counter to 0 so subsequent shutdown
4324 // bookkeeping doesn't see phantom in-flight work, but do
4325 // NOT bump events_ingested — the producers never
4326 // "completed," so the reconciliation must classify them
4327 // as drops.
4328 bus.release_stranded_ingest(2);
4329
4330 let _ = shutdown_task.await.unwrap();
4331
4332 assert!(
4333 bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4334 "was_lossy must stay true when drain did not catch up",
4335 );
4336 // stranded=2, post_deadline_ingests=0, actual_drops=2.
4337 // We don't pin == 2 exactly because the "known under-count
4338 // window" doc'd at L1595-L1614 acknowledges a ±1 bias
4339 // under racy interleavings; the floor is what matters.
4340 assert!(
4341 bus.stats.events_dropped.load(AtomicOrdering::Acquire) >= 1,
4342 "events_dropped must reflect at least one stranded ingest",
4343 );
4344 }
4345
4346 /// `remove_shard_internal` at L832-L859 dispatches any
4347 /// stranded ring-buffer events through the adapter as a
4348 /// one-shot batch. The normal scale-down path drains the
4349 /// ring via the drain worker before remove fires, so
4350 /// `stranded.is_empty()` on the happy path and L832-L859
4351 /// never runs — but a race window between drain completion
4352 /// and remove (or a manual remove on a shard whose drain
4353 /// worker is parked) leaves events stranded. Pre-fix any
4354 /// stranded events would have been silently dropped; the
4355 /// L832-L859 path now flushes them and bumps both
4356 /// `batches_dispatched` (+1) and `events_dispatched` (+N).
4357 ///
4358 /// We force the stranded-events condition by:
4359 /// 1. Pausing time so the drain worker can't pull from
4360 /// the ring (its polling sleeps don't tick).
4361 /// 2. Pushing raw events directly into shard 0's ring via
4362 /// the `with_shard` API.
4363 /// 3. Calling `remove_shard_internal(0)` — its inner
4364 /// `shard_manager.remove_shard` then returns the
4365 /// stranded vec, triggering the L832-L859 dispatch.
4366 #[tokio::test(start_paused = true)]
4367 async fn remove_shard_internal_dispatches_stranded_ring_buffer_events() {
4368 let received = Arc::new(AtomicU64::new(0));
4369 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
4370 received: Arc::clone(&received),
4371 });
4372 // Keep the default scaling policy: `remove_shard_internal`
4373 // routes through `shard_manager.remove_shard`, which
4374 // requires the dynamic-scaling mapper. `without_scaling()`
4375 // turns the mapper off and the remove fails with
4376 // "Dynamic scaling not enabled". The scaling MONITOR
4377 // task is a separate concern and isn't auto-started by
4378 // `new_with_adapter` — see
4379 // `start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it`
4380 // for the lifecycle contract.
4381 let config = EventBusConfig::builder()
4382 .num_shards(2)
4383 .ring_buffer_capacity(1024)
4384 .build()
4385 .unwrap();
4386 let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
4387
4388 // Yield enough times for the spawned drain workers to
4389 // reach their first `tokio::time::sleep` and park. With
4390 // `start_paused` they can't tick once parked; the gate
4391 // is here so the worker on shard 0 has *already parked*
4392 // before we push events into the ring. Without this, a
4393 // worker that hadn't yet hit its first park could race
4394 // the push and dispatch via the normal drain path
4395 // instead of the L832-L859 stranded path the test exists
4396 // to pin.
4397 for _ in 0..10 {
4398 tokio::task::yield_now().await;
4399 }
4400
4401 // Stage 5 events directly in shard 0's ring. With paused
4402 // time the drain worker can't pull them out — they sit
4403 // there until remove fires.
4404 let stranded_count = 5u64;
4405 let pushed = bus.shard_manager.with_shard(0, |shard| {
4406 for i in 0..stranded_count {
4407 shard
4408 .try_push_raw(bytes::Bytes::from(format!("stranded-{i}")))
4409 .unwrap();
4410 }
4411 });
4412 assert!(
4413 pushed.is_some(),
4414 "shard 0 must exist for try_push_raw to land",
4415 );
4416
4417 let batches_before = bus.stats.batches_dispatched.load(AtomicOrdering::Acquire);
4418 let events_before = bus.stats.events_dispatched.load(AtomicOrdering::Acquire);
4419
4420 // Directly invoke the internal — the production path
4421 // routes through manual_scale_down + finalize_draining,
4422 // but the L832-L859 dispatch arm is the same regardless
4423 // of how we got here.
4424 bus.remove_shard_internal(0)
4425 .await
4426 .expect("remove_shard_internal must succeed");
4427
4428 // The CountingAdapter saw the stranded batch.
4429 assert_eq!(
4430 received.load(AtomicOrdering::SeqCst),
4431 stranded_count,
4432 "stranded events must reach the adapter",
4433 );
4434 // Bus stats bumped by exactly the right amounts.
4435 assert_eq!(
4436 bus.stats.batches_dispatched.load(AtomicOrdering::Acquire),
4437 batches_before + 1,
4438 "exactly one stranded batch must increment batches_dispatched",
4439 );
4440 assert_eq!(
4441 bus.stats.events_dispatched.load(AtomicOrdering::Acquire),
4442 events_before + stranded_count,
4443 "events_dispatched must reflect every stranded event",
4444 );
4445
4446 // Clean shutdown so the remaining shard's drain worker
4447 // exits cleanly rather than getting torn down by the
4448 // bus's `Drop` impl (which silently loses any events
4449 // still in the ring buffers / mpsc channels). Mirrors
4450 // the lifecycle hygiene the other paused-time tests in
4451 // this module follow.
4452 //
4453 // The remaining drain worker (shard 1) is parked on
4454 // `tokio::time::sleep`; tokio's paused-time auto-advance
4455 // wakes it once every task is parked on time, but we
4456 // yield-loop here anyway so the JoinHandle await below
4457 // doesn't depend on auto-advance heuristics.
4458 bus.shutdown_via_ref()
4459 .await
4460 .expect("shutdown must succeed under paused time");
4461 }
4462
4463 /// `EventBus::new_with_adapter` must surface a `Fatal` error
4464 /// — naming the configured path — when `producer_nonce_path`
4465 /// points at a location the runtime can't read or create. The
4466 /// path-load failure path at L292-L301 is the single signal
4467 /// operators get that their persistent-nonce wiring is wrong;
4468 /// silently falling back to the per-process nonce would
4469 /// silently degrade the durability guarantee that path was
4470 /// chosen to provide (at-most-once across restart).
4471 #[tokio::test]
4472 async fn new_with_adapter_surfaces_fatal_when_nonce_path_unreadable() {
4473 // Point at a path whose parent directory doesn't exist.
4474 // `load_or_create` has to mkdir-then-write, so a missing
4475 // parent fails on every platform. Anchoring under
4476 // `std::env::temp_dir()` keeps the path well-formed on
4477 // both Windows (`C:\Users\...\Temp\...`) and Linux/macOS
4478 // (`/tmp/...`) — a hardcoded `C:\\…` literal looks like a
4479 // valid filename on POSIX filesystems and the create
4480 // happily succeeds, silently bypassing the failure-arm
4481 // this test exists to pin.
4482 let marker = "net-coverage-nonexistent-parent";
4483 let bogus_path = std::env::temp_dir()
4484 .join(marker)
4485 .join("deeply")
4486 .join("nested")
4487 .join("nonce");
4488 let config = EventBusConfig::builder()
4489 .num_shards(1)
4490 .ring_buffer_capacity(1024)
4491 .producer_nonce_path(&bogus_path)
4492 .build()
4493 .unwrap();
4494 let adapter: Box<dyn crate::adapter::Adapter> =
4495 Box::new(crate::adapter::NoopAdapter::new());
4496
4497 // EventBus doesn't impl Debug, so we can't unwrap_err
4498 // it directly — pattern-match instead.
4499 let result = EventBus::new_with_adapter(config, adapter).await;
4500 match result {
4501 Err(AdapterError::Fatal(msg)) => {
4502 assert!(
4503 msg.contains("producer-nonce") || msg.contains("nonce"),
4504 "error must name the failing subsystem: {msg}",
4505 );
4506 // The path must appear in the message so operators
4507 // can grep their logs to the misconfigured config
4508 // field without guessing. The unique marker
4509 // segment is portable across platforms (the temp
4510 // root differs but the joined marker is the same).
4511 assert!(
4512 msg.contains(marker),
4513 "error must include the configured path: {msg}",
4514 );
4515 }
4516 Err(other) => panic!("expected Fatal, got {other:?}"),
4517 Ok(_) => panic!("expected Fatal, got Ok"),
4518 }
4519 }
4520
4521 /// `start_scaling_monitor` must be a no-op on a non-scaling
4522 /// bus AND idempotent under repeated calls on a scaling bus.
4523 /// A regression that drops the early-returns would spawn
4524 /// shadow monitor tasks that hold a Weak<EventBus> until they
4525 /// next observe shutdown — pure-overhead duplicate
4526 /// metrics-tick wakeup rates that compete for the same
4527 /// `evaluate_scaling` lock.
4528 #[tokio::test]
4529 async fn start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it() {
4530 // Scaling explicitly disabled: start_scaling_monitor
4531 // must return before touching the slot. (The builder
4532 // default fills in a ScalingPolicy when scaling isn't
4533 // mentioned — `without_scaling()` is the only way to
4534 // force `config.scaling == None`.)
4535 let config = EventBusConfig::builder()
4536 .num_shards(1)
4537 .ring_buffer_capacity(1024)
4538 .without_scaling()
4539 .build()
4540 .unwrap();
4541 let bus = EventBus::new(config).await.unwrap();
4542 let bus = Arc::new(bus);
4543 bus.start_scaling_monitor();
4544 assert!(
4545 bus.scaling_monitor.lock().is_none(),
4546 "non-scaling bus must not install a monitor",
4547 );
4548
4549 // Scaling enabled: first call installs, second is a no-op
4550 // (the existing handle stays put — proves the early-return
4551 // didn't overwrite the slot).
4552 let policy = crate::shard::ScalingPolicy {
4553 min_shards: 2,
4554 max_shards: 4,
4555 ..Default::default()
4556 };
4557 let config = EventBusConfig::builder()
4558 .num_shards(2)
4559 .ring_buffer_capacity(1024)
4560 .scaling(policy)
4561 .build()
4562 .unwrap();
4563 let bus = Arc::new(EventBus::new(config).await.unwrap());
4564 // EventBus::new doesn't auto-start the monitor — we start
4565 // it explicitly so the test owns the lifecycle.
4566 bus.start_scaling_monitor();
4567 let handle_id_after_first = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4568 assert!(handle_id_after_first.is_some(), "first start must install");
4569
4570 // Second call must NOT replace the handle. If the early-
4571 // return is broken, the slot would carry a new handle id.
4572 bus.start_scaling_monitor();
4573 let handle_id_after_second = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4574 assert_eq!(
4575 handle_id_after_first, handle_id_after_second,
4576 "second start_scaling_monitor must NOT replace the running handle",
4577 );
4578 }
4579}