net/bus.rs
1//! Main EventBus facade.
2//!
3//! The EventBus provides a unified API for:
4//! - Event ingestion (non-blocking)
5//! - Event consumption (async polling with filtering)
6//! - Lifecycle management
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14
15use crate::adapter::{Adapter, NoopAdapter};
16use crate::config::{AdapterConfig, BatchConfig, EventBusConfig};
17use crate::consumer::{ConsumeRequest, ConsumeResponse, PollMerger};
18use crate::error::{AdapterError, ConsumerError, IngestionError, IngestionResult};
19use crate::event::{Batch, Event, RawEvent};
20use crate::shard::{BatchWorker, ScalingDecision, ShardManager, ShardMetrics};
21
22#[cfg(feature = "jetstream")]
23use crate::adapter::JetStreamAdapter;
24#[cfg(feature = "net")]
25use crate::adapter::NetAdapter;
26#[cfg(feature = "redis")]
27use crate::adapter::RedisAdapter;
28
29/// The main event bus.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// use net::{EventBus, EventBusConfig, Event};
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
38/// let bus = EventBus::new(EventBusConfig::default()).await?;
39///
40/// // Ingest events
41/// bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
42///
43/// // Poll events
44/// let response = bus.poll(ConsumeRequest::new(100)).await?;
45///
46/// bus.shutdown().await?;
47/// Ok(())
48/// }
49/// ```
50pub struct EventBus {
51 /// Shard manager for parallel ingestion.
52 shard_manager: Arc<ShardManager>,
53 /// Adapter for durable storage.
54 adapter: Arc<dyn Adapter>,
55 /// Poll merger for cross-shard consumption.
56 poll_merger: arc_swap::ArcSwap<PollMerger>,
57 /// Serializes the `shard_manager.shard_ids() → poll_merger.store`
58 /// block in `add_shard_internal` / `remove_shard_internal`.
59 /// Without this lock, two callers (e.g. scaling-monitor
60 /// add_shard racing manual_scale_down's remove_shard) read the
61 /// shard ids snapshot at slightly different points and then
62 /// race on the `arc_swap.store`. The write that lands second
63 /// can clobber the more-current view: T1 reads `{0..5}`, T2
64 /// reads `{1..4}`, T2 stores `{1..4}`, T1 stores `{0..5}` —
65 /// the published merger then routes polls to the just-removed
66 /// shard 0 until the next topology change repairs the
67 /// snapshot.
68 poll_merger_swap_lock: parking_lot::Mutex<()>,
69 /// Per-shard worker handles. Stored separately so shutdown can
70 /// await drain workers *before* batch workers — the drain
71 /// worker's final sweep races the batch worker's exit
72 /// otherwise, and any events the drain worker pushes to the
73 /// channel after the batch worker has stopped reading are
74 /// silently lost.
75 batch_workers: parking_lot::Mutex<std::collections::HashMap<u16, ShardWorkers>>,
76 /// Channels for sending batches to workers (shard_id -> sender).
77 batch_senders: parking_lot::RwLock<
78 std::collections::HashMap<u16, mpsc::Sender<Vec<crate::event::InternalEvent>>>,
79 >,
80 /// Shutdown flag.
81 shutdown: Arc<AtomicBool>,
82 /// Gate signaling drain workers that the in-flight wait has
83 /// completed and they may safely run their final ring-buffer
84 /// sweep. Distinct from `shutdown` because the drain worker
85 /// observing `shutdown=true` alone is not enough: a producer
86 /// that read `shutdown=false` may still be mid-push, and if the
87 /// drain worker rushes through its final sweep before that push
88 /// is visible the event is stranded. `shutdown()` sets this
89 /// after waiting for `in_flight_ingests==0`, at which point the
90 /// Acquire load on the drain side synchronizes-with the Release
91 /// store here, transitively chaining through the SeqCst
92 /// in-flight handshake to make every observed-pre-shutdown push
93 /// visible to the drain worker's subsequent `pop_batch_into`.
94 drain_finalize_ready: Arc<AtomicBool>,
95 /// In-flight ingest counter. Incremented before each ingest's
96 /// shutdown check and decremented after the push completes (or
97 /// bails). `shutdown()` waits for this to drop to zero *after*
98 /// setting `shutdown=true` and *before* setting
99 /// `drain_finalize_ready=true` so no producer is mid-push when
100 /// the drain workers do their final sweep — closing the race
101 /// where a producer that observed `shutdown=false` could push
102 /// *after* the drain worker's last `pop_batch_into` returned
103 /// zero, leaving the event stranded in the ring buffer.
104 /// Pre-fix this was `AtomicU32`. A 4-billion-in-flight
105 /// wrap is not realistic in production, but the counter
106 /// participates in the shutdown protocol — a wrap to 0 would
107 /// trick the wait-for-zero loop into thinking shutdown was
108 /// safe to proceed while producers were still mid-push.
109 /// Widened to `AtomicU64` so the wrap is astronomical
110 /// (1.8e19 in-flight ingests).
111 in_flight_ingests: AtomicU64,
112 /// Set to `true` after `shutdown()` runs to completion. `Drop`
113 /// uses this to detect "dropped without an awaited shutdown" —
114 /// in that case events still in the ring buffers / mpsc channels
115 /// are silently lost (see `Drop` impl).
116 shutdown_completed: AtomicBool,
117 /// Configuration.
118 config: EventBusConfig,
119 /// Statistics.
120 stats: Arc<EventBusStats>,
121 /// Producer nonce. Loaded from
122 /// `config.producer_nonce_path` on startup when the path is
123 /// configured; otherwise falls back to the per-process default
124 /// from `event::batch_process_nonce`. Stamped on every batch
125 /// the bus emits — the worker spawn copies this u64 into
126 /// `BatchWorkerParams::producer_nonce`, and
127 /// `remove_shard_internal`'s stranded-flush uses it via
128 /// `Batch::with_nonce`.
129 producer_nonce: u64,
130 /// Scaling monitor task handle.
131 scaling_monitor: parking_lot::Mutex<Option<JoinHandle<()>>>,
132}
133
134/// Worker handles for a single shard. The drain worker pumps
135/// events from the ring buffer into an mpsc channel; the batch
136/// worker reads from that channel and dispatches to the adapter.
137/// Shutdown ordering is load-bearing — see `EventBus::shutdown`.
138struct ShardWorkers {
139 batch: JoinHandle<()>,
140 drain: JoinHandle<()>,
141 /// Bus-owned mirror of the BatchWorker's `next_sequence`.
142 /// `remove_shard_internal` reads this AFTER awaiting `batch`
143 /// to learn the worker's final post-flush sequence, then uses
144 /// it as the `sequence_start` for the stranded-ring-buffer
145 /// flush so the stranded msg-ids fall strictly past every
146 /// msg-id the worker emitted — without this, JetStream dedup
147 /// would silently drop the stranded batch when both used 0.
148 next_sequence: Arc<AtomicU64>,
149}
150
151/// RAII guard for an in-flight ingest. Decrements
152/// `in_flight_ingests` on drop so `shutdown()` can wait for the
153/// counter to reach zero.
154struct IngestGuard<'a> {
155 bus: &'a EventBus,
156}
157
158impl Drop for IngestGuard<'_> {
159 fn drop(&mut self) {
160 self.bus
161 .in_flight_ingests
162 .fetch_sub(1, AtomicOrdering::SeqCst);
163 }
164}
165
166/// Event bus statistics.
167#[derive(Debug, Default)]
168pub struct EventBusStats {
169 /// Total events ingested.
170 pub events_ingested: AtomicU64,
171 /// Events dropped due to backpressure.
172 pub events_dropped: AtomicU64,
173 /// Batches dispatched to adapter.
174 ///
175 /// Pre-fix this field was declared but never incremented anywhere
176 /// — `flush()`'s Phase 2 progress probe (`bus.rs:815`) read it as
177 /// "did the BatchWorker make progress this `max_delay` window?",
178 /// observed `0 == 0`, and always early-broke after a single
179 /// window. On Windows-class timer resolution the race against the
180 /// BatchWorker's first `recv_timeout` tipped the wrong way for
181 /// `flush_is_a_delivery_barrier` regularly. Now incremented in
182 /// the BatchWorker spawn (after a successful `dispatch_batch`)
183 /// and in `remove_shard_internal`'s stranded-flush.
184 pub batches_dispatched: AtomicU64,
185 /// Total events dispatched to the adapter (sum of batch lengths
186 /// from successful `on_batch` calls). Companion to
187 /// `batches_dispatched` — by the time `flush()` returns,
188 /// `events_dispatched + events_dropped == events_ingested`. FFI
189 /// consumers can also use this to monitor end-to-end delivery.
190 pub events_dispatched: AtomicU64,
191 /// Set to `true` if `shutdown()` / `shutdown_via_ref()` had to
192 /// proceed past the in-flight-ingest grace deadline (5 s) with
193 /// producers still mid-push. The stranded count is counted into
194 /// `events_dropped` (already; pre-fix), but `shutdown` returns
195 /// `Ok(())` regardless — callers that need to distinguish
196 /// "clean shutdown" from "lossy shutdown" check this flag in
197 /// `bus.stats()` afterward (only meaningful for the
198 /// `shutdown_via_ref` path that doesn't consume the bus).
199 ///
200 /// Pre-fix the warning + `events_dropped` increment
201 /// were the only signal; `Result<(), AdapterError>` returned
202 /// `Ok` indistinguishable from a clean shutdown.
203 pub shutdown_was_lossy: std::sync::atomic::AtomicBool,
204}
205
206impl EventBus {
207 /// Create a new event bus with the given configuration.
208 pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError> {
209 // Create adapter from config
210 let adapter: Box<dyn Adapter> = match &config.adapter {
211 AdapterConfig::Noop => Box::new(NoopAdapter::new()),
212 #[cfg(feature = "redis")]
213 AdapterConfig::Redis(redis_config) => {
214 Box::new(RedisAdapter::new(redis_config.clone())?)
215 }
216 #[cfg(feature = "jetstream")]
217 AdapterConfig::JetStream(js_config) => {
218 Box::new(JetStreamAdapter::new(js_config.clone())?)
219 }
220 #[cfg(feature = "net")]
221 AdapterConfig::Net(net_config) => Box::new(NetAdapter::new((**net_config).clone())?),
222 };
223
224 Self::new_with_adapter(config, adapter).await
225 }
226
227 /// Create a new event bus with a caller-supplied adapter.
228 ///
229 /// `config.adapter` is ignored — the supplied `adapter` is used
230 /// instead. Useful for tests that need to observe or inject
231 /// behavior at the adapter boundary (e.g. a counting adapter
232 /// for end-to-end delivery assertions, a flaky adapter for
233 /// retry-path coverage).
234 pub async fn new_with_adapter(
235 config: EventBusConfig,
236 mut adapter: Box<dyn Adapter>,
237 ) -> Result<Self, AdapterError> {
238 config
239 .validate()
240 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
241
242 // Initialize adapter (with timeout to prevent hanging on unreachable backends)
243 tokio::time::timeout(config.adapter_timeout, adapter.init())
244 .await
245 .map_err(|_| AdapterError::Fatal("adapter init timed out".into()))??;
246 let adapter: Arc<dyn Adapter> = Arc::from(adapter);
247
248 // Create shard manager (with or without dynamic scaling)
249 let shard_manager = if let Some(ref scaling_policy) = config.scaling {
250 Arc::new(
251 ShardManager::with_mapper(
252 config.num_shards,
253 config.ring_buffer_capacity,
254 config.backpressure_mode,
255 scaling_policy.clone(),
256 )
257 .map_err(|e| AdapterError::Fatal(e.to_string()))?,
258 )
259 } else {
260 Arc::new(ShardManager::new(
261 config.num_shards,
262 config.ring_buffer_capacity,
263 config.backpressure_mode,
264 ))
265 };
266
267 // Create poll merger.
268 //
269 // Pass the live id set rather than the count. At
270 // initial construction the ids are dense (`0..num_shards`),
271 // but using `shard_ids()` here keeps a single code path with
272 // the post-scaling re-stores below.
273 let poll_merger = arc_swap::ArcSwap::from_pointee(PollMerger::new(
274 adapter.clone(),
275 shard_manager.shard_ids(),
276 ));
277
278 // Shutdown flag and drain-finalize gate. See `drain_finalize_ready`
279 // doc on `EventBus` for the synchronization contract.
280 let shutdown = Arc::new(AtomicBool::new(false));
281 let drain_finalize_ready = Arc::new(AtomicBool::new(false));
282
283 // Stats are shared with every BatchWorker spawn so successful
284 // dispatches increment `batches_dispatched` / `events_dispatched`
285 // — `flush()`'s Phase 2 progress probe gates on those.
286 let stats = Arc::new(EventBusStats::default());
287
288 // Producer nonce. Persistent path → load-or-create
289 // the durable u64 so cross-process retries dedup against the
290 // prior incarnation. No path → per-process default (today's
291 // at-most-once-across-restart behavior).
292 let producer_nonce = match &config.producer_nonce_path {
293 Some(path) => crate::adapter::PersistentProducerNonce::load_or_create(path)
294 .map_err(|e| {
295 AdapterError::Fatal(format!(
296 "failed to load/create producer-nonce file at {}: {e}",
297 path.display(),
298 ))
299 })?
300 .nonce(),
301 None => crate::event::batch_process_nonce(),
302 };
303
304 // Create batch workers for each shard
305 let mut batch_workers: std::collections::HashMap<u16, ShardWorkers> =
306 std::collections::HashMap::with_capacity(config.num_shards as usize);
307 let mut batch_senders =
308 std::collections::HashMap::with_capacity(config.num_shards as usize);
309
310 for shard_id in 0..config.num_shards {
311 let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
312
313 let next_sequence = Arc::new(AtomicU64::new(0));
314
315 let batch = spawn_batch_worker(BatchWorkerParams {
316 shard_id,
317 rx,
318 adapter: adapter.clone(),
319 shard_manager: shard_manager.clone(),
320 config: config.batch.clone(),
321 adapter_timeout: config.adapter_timeout,
322 batch_retries: config.adapter_batch_retries,
323 next_sequence: next_sequence.clone(),
324 stats: stats.clone(),
325 producer_nonce,
326 });
327
328 let drain = spawn_drain_worker_for_shard(
329 shard_id,
330 shard_manager.clone(),
331 tx.clone(),
332 shutdown.clone(),
333 drain_finalize_ready.clone(),
334 );
335
336 batch_workers.insert(
337 shard_id,
338 ShardWorkers {
339 batch,
340 drain,
341 next_sequence,
342 },
343 );
344 batch_senders.insert(shard_id, tx);
345 }
346
347 let bus = Self {
348 shard_manager,
349 adapter,
350 poll_merger,
351 poll_merger_swap_lock: parking_lot::Mutex::new(()),
352 batch_workers: parking_lot::Mutex::new(batch_workers),
353 batch_senders: parking_lot::RwLock::new(batch_senders),
354 shutdown,
355 drain_finalize_ready,
356 in_flight_ingests: AtomicU64::new(0),
357 shutdown_completed: AtomicBool::new(false),
358 config,
359 stats,
360 producer_nonce,
361 scaling_monitor: parking_lot::Mutex::new(None),
362 };
363
364 Ok(bus)
365 }
366
367 /// Start the scaling monitor (if dynamic scaling is enabled).
368 /// This spawns a background task that periodically evaluates scaling decisions.
369 ///
370 /// The spawned task holds a `Weak<Self>` rather than a strong
371 /// `Arc<Self>` clone. With a strong clone the task kept the bus
372 /// alive forever, and `shutdown(self)` (which consumes by value)
373 /// was unreachable: callers with an `Arc<EventBus>` could not
374 /// `Arc::try_unwrap` to consume it because the spawned task always
375 /// held one of the strong refs.
376 ///
377 /// With a `Weak`, the monitor task upgrades each tick. Once the
378 /// last caller-held `Arc` is dropped, the upgrade fails and the
379 /// task exits cleanly. To shut down via `shutdown(self)`, the
380 /// caller must hold the only strong reference: `Arc::try_unwrap`
381 /// on the resulting bus succeeds because the spawned task only
382 /// holds a Weak.
383 pub fn start_scaling_monitor(self: &Arc<Self>) {
384 if self.config.scaling.is_none() {
385 return;
386 }
387
388 // Idempotency check: no-op when a monitor is already
389 // installed. Otherwise a second `start_scaling_monitor`
390 // call would overwrite the slot without aborting the
391 // previous `JoinHandle` — the displaced task would keep
392 // running, holding a `Weak<EventBus>`, only exiting when it
393 // next observed `shutdown` or failed to upgrade. Two
394 // concurrent monitors would briefly compete for
395 // `evaluate_scaling`'s lock, doubling the metrics-tick
396 // wakeup rate.
397 let mut slot = self.scaling_monitor.lock();
398 if slot.is_some() {
399 tracing::debug!("start_scaling_monitor: monitor already running, skipping");
400 return;
401 }
402
403 let weak = Arc::downgrade(self);
404 let handle = tokio::spawn(async move {
405 run_scaling_monitor_via_weak(weak).await;
406 });
407
408 *slot = Some(handle);
409 }
410
411 /// Internal: Add a new shard with its workers.
412 ///
413 /// The previous implementation called `shard_manager.add_shard()`
414 /// first, which atomically marked the shard `Active` and published
415 /// it to the routing table. So `select_shard` could route producer
416 /// pushes to the new id *before* any drain or batch worker existed,
417 /// leaving events queued in a buffer with no consumer (and
418 /// triggering the configured backpressure mode if the buffer
419 /// filled).
420 ///
421 /// The fix uses the two-phase API on `ShardManager`:
422 /// 1. `add_shard()` allocates the id and metrics collector,
423 /// adds the shard to the routing table in `Provisioning`
424 /// state — so `with_shard` works (which the drain worker
425 /// needs) but `select_shard` skips it.
426 /// 2. Spawn batch + drain workers and register the sender.
427 /// 3. `activate_shard()` flips state to `Active`. Only now
428 /// does `select_shard` start routing producer pushes.
429 async fn add_shard_internal(&self) -> Result<u16, AdapterError> {
430 self.add_shard_internal_with_cooldown_policy(false).await
431 }
432
433 /// Like [`add_shard_internal`] but bypasses the auto-scaling
434 /// cooldown. See [`ShardManager::add_shard_force`].
435 async fn add_shard_internal_force(&self) -> Result<u16, AdapterError> {
436 self.add_shard_internal_with_cooldown_policy(true).await
437 }
438
439 async fn add_shard_internal_with_cooldown_policy(
440 &self,
441 force: bool,
442 ) -> Result<u16, AdapterError> {
443 // Step 1: provisioning add — not yet selectable.
444 let new_id = if force {
445 self.shard_manager.add_shard_force()
446 } else {
447 self.shard_manager.add_shard()
448 }
449 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
450
451 // Step 2: spawn workers and register the sender.
452 let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
453
454 let next_sequence = Arc::new(AtomicU64::new(0));
455
456 let batch = spawn_batch_worker(BatchWorkerParams {
457 shard_id: new_id,
458 rx,
459 adapter: self.adapter.clone(),
460 shard_manager: self.shard_manager.clone(),
461 config: self.config.batch.clone(),
462 adapter_timeout: self.config.adapter_timeout,
463 batch_retries: self.config.adapter_batch_retries,
464 next_sequence: next_sequence.clone(),
465 stats: self.stats.clone(),
466 producer_nonce: self.producer_nonce,
467 });
468
469 // Order of publish here matters. Today `select_shard`
470 // filters on `state == Active` and a shard is still
471 // `Provisioning` at this point, so producers cannot route
472 // to it yet — but the pattern (insert sender → spawn drain
473 // worker → activate_shard) is ordering-fragile: a future
474 // refactor that flips `activate_shard` ahead of the drain-
475 // worker spawn would leave a window where producers can
476 // push but no drain worker is polling. Pin the order by
477 // installing the sender BEFORE the drain worker (existing
478 // behaviour) so the worker observes a fully-published
479 // sender at spawn time; `activate_shard` runs further down.
480 self.batch_senders.write().insert(new_id, tx.clone());
481
482 let drain = spawn_drain_worker_for_shard(
483 new_id,
484 self.shard_manager.clone(),
485 tx,
486 self.shutdown.clone(),
487 self.drain_finalize_ready.clone(),
488 );
489
490 self.batch_workers.lock().insert(
491 new_id,
492 ShardWorkers {
493 batch,
494 drain,
495 next_sequence,
496 },
497 );
498
499 // Step 3: workers are live — flip the shard to Active so
500 // `select_shard` will route to it.
501 //
502 // On `activate_shard` failure we mirror
503 // `remove_shard_internal`'s teardown: drop the sender,
504 // unmap the provisioning entry (which atomically pops any
505 // residual ring-buffer events), gracefully await both
506 // workers (so the drain worker's `scratch` Vec sends its
507 // contents on the channel and the batch worker's
508 // `current_batch` is flushed via the channel-close path),
509 // then dispatch any stranded ring-buffer events through
510 // the adapter. Pre-fix this used `.abort()` on both
511 // handles which dropped the drain
512 // worker's scratch and the batch worker's current_batch
513 // without dispatch.
514 if let Err(e) = self.shard_manager.activate_shard(new_id) {
515 tracing::warn!(
516 shard_id = new_id,
517 error = %e,
518 "activate_shard failed; rolling back provisioning state"
519 );
520
521 // 1. Drop the bus-side sender. The drain worker still
522 // holds its own clone, so the channel stays open
523 // until `with_shard` returns None (step 2) and the
524 // drain worker breaks out of its loop, dropping
525 // its sender and finally closing the channel.
526 self.batch_senders.write().remove(&new_id);
527
528 // 2. Atomically pop any ring-buffer residue and
529 // unmap the Provisioning entry. After this,
530 // `with_shard(new_id)` returns None and the drain
531 // worker exits at its next poll (after sending
532 // any events it had already popped into `scratch`
533 // on this iteration).
534 //
535 // For a brand-new Provisioning shard the buffer
536 // should be empty (`select_shard` skips
537 // non-Active states), so `stranded` is normally
538 // `Vec::new()`. The flush below is a defensive
539 // no-op on the happy path but covers any future
540 // code path that routes to a Provisioning shard
541 // or any race window that tucked an event in
542 // before `activate_shard` returned its error.
543 let stranded = self.shard_manager.remove_shard(new_id).unwrap_or_default();
544
545 // 3. Take ownership of the worker handles and await
546 // them gracefully. Order: drain first (it pumps
547 // its scratch + final-sweep contents into the
548 // channel and exits), then batch (which receives
549 // those events plus any prior channel residue,
550 // flushes its own `current_batch`, and exits).
551 //
552 // Awaiting in this order is what makes the drain
553 // worker's scratch reach the adapter — the
554 // drain's `Some(N>0)` arm `mem::replace`s scratch
555 // into a batch and `sender.send(batch).await`s
556 // it; that send must complete (or fail) before
557 // the drain worker breaks. The batch worker's
558 // `Ok(None)` arm then runs after both senders
559 // are dropped and flushes any pending batch.
560 // Bound each JoinHandle await so a worker that's
561 // parked inside a slow adapter call (e.g. drain worker
562 // mid-`sender.send().await` against a backpressured
563 // channel because the batch worker is itself blocked
564 // in the adapter) cannot pin rollback indefinitely.
565 // 2x `adapter_timeout` is the natural ceiling: the
566 // batch worker uses `adapter_timeout` per dispatch and
567 // is expected to wake within that window. A timeout
568 // here leaks the JoinHandle — acceptable because
569 // step 1 already removed the bus-side sender, so the
570 // detached task can't observe new work and will exit
571 // on its next loop iteration.
572 let workers = self.batch_workers.lock().remove(&new_id);
573 // `worker_detached` is set when either join times out
574 // — the worker is still running on a leaked
575 // JoinHandle. In that case the `next_sequence` atomic
576 // is no longer a reliable upper bound: the detached
577 // worker may publish a final batch whose msg-ids land
578 // in the same `[next_sequence..N]` range we'd use for
579 // the stranded-flush, producing duplicate XADDs or
580 // JetStream dedup hits. Skip the stranded-flush when
581 // detached and surface the loss explicitly so the
582 // operator sees it.
583 let mut worker_detached = false;
584 let final_next_sequence = if let Some(workers) = workers {
585 let bound = self.config.adapter_timeout.saturating_mul(2);
586 match tokio::time::timeout(bound, workers.drain).await {
587 Ok(Ok(())) => {}
588 Ok(Err(err)) => {
589 tracing::warn!(
590 shard_id = new_id,
591 error = %err,
592 "drain worker JoinHandle errored on activate-failure rollback"
593 );
594 }
595 Err(_) => {
596 tracing::warn!(
597 shard_id = new_id,
598 timeout_ms = bound.as_millis() as u64,
599 "drain worker did not exit within timeout on activate-failure rollback; detaching"
600 );
601 worker_detached = true;
602 }
603 }
604 match tokio::time::timeout(bound, workers.batch).await {
605 Ok(Ok(())) => {}
606 Ok(Err(err)) => {
607 tracing::warn!(
608 shard_id = new_id,
609 error = %err,
610 "BatchWorker JoinHandle errored on activate-failure rollback"
611 );
612 }
613 Err(_) => {
614 tracing::warn!(
615 shard_id = new_id,
616 timeout_ms = bound.as_millis() as u64,
617 "BatchWorker did not exit within timeout on activate-failure rollback; detaching"
618 );
619 worker_detached = true;
620 }
621 }
622 workers.next_sequence.load(AtomicOrdering::Acquire)
623 } else {
624 0
625 };
626
627 // 4. Dispatch any stranded events as a single-shot
628 // batch so they reach durable storage with the
629 // correct sequence-id segment. Identical to the
630 // `remove_shard_internal` teardown — but only when
631 // the worker actually exited. If it timed out and
632 // is still running on a leaked handle, dispatching
633 // here would emit msg-ids overlapping the worker's
634 // final flush; surface the events as dropped
635 // instead so the duplicate-on-the-wire hazard is
636 // avoided.
637 if !stranded.is_empty() && worker_detached {
638 let count = stranded.len();
639 self.stats
640 .events_dropped
641 .fetch_add(count as u64, AtomicOrdering::Relaxed);
642 self.stats
643 .shutdown_was_lossy
644 .store(true, AtomicOrdering::Release);
645 tracing::error!(
646 shard_id = new_id,
647 count,
648 "activate-failure rollback: skipping stranded-flush \
649 because a worker JoinHandle timed out and may still \
650 be running; events would collide with the detached \
651 worker's final batch on the wire. Counted as dropped."
652 );
653 } else if !stranded.is_empty() {
654 let count = stranded.len();
655 let batch = crate::event::Batch::with_nonce(
656 new_id,
657 stranded,
658 final_next_sequence,
659 self.producer_nonce,
660 );
661 let dispatched = dispatch_batch(
662 &*self.adapter,
663 Arc::new(batch),
664 new_id,
665 self.config.adapter_timeout,
666 self.config.adapter_batch_retries,
667 )
668 .await;
669 if dispatched {
670 self.stats
671 .batches_dispatched
672 .fetch_add(1, AtomicOrdering::Relaxed);
673 self.stats
674 .events_dispatched
675 .fetch_add(count as u64, AtomicOrdering::Relaxed);
676 tracing::info!(
677 shard_id = new_id,
678 count,
679 "activate-failure rollback: flushed stranded events to adapter",
680 );
681 } else {
682 tracing::error!(
683 shard_id = new_id,
684 count,
685 "activate-failure rollback: adapter rejected stranded events; \
686 events lost"
687 );
688 }
689 }
690
691 return Err(AdapterError::Fatal(e.to_string()));
692 }
693
694 // Update poll merger with the post-add id set. Hold
695 // `poll_merger_swap_lock` across the snapshot-and-store so a
696 // concurrent remove_shard can't sneak between our `shard_ids()`
697 // read and our `arc_swap.store` and clobber the published view
698 // with a stale snapshot.
699 {
700 let _swap_guard = self.poll_merger_swap_lock.lock();
701 self.poll_merger.store(Arc::new(PollMerger::new(
702 self.adapter.clone(),
703 self.shard_manager.shard_ids(),
704 )));
705 }
706
707 tracing::info!(shard_id = new_id, "Added new shard");
708 Ok(new_id)
709 }
710
711 /// Internal: Remove a stopped shard.
712 ///
713 /// Previously this dropped the worker `JoinHandle`s and unmapped
714 /// the shard without first draining its ring buffer. Any events
715 /// still queued at the moment of removal — even just a few from a
716 /// producer that pushed concurrently with the scale-down decision
717 /// — were silently stranded once the drain worker exited on
718 /// `with_shard → None`.
719 ///
720 /// The fix:
721 /// 1. Wait for the drain worker we're about to retire to flush
722 /// the channel, by closing the bus-side sender first.
723 /// 2. Call `remove_shard`, which atomically pops the
724 /// ring-buffer remnants and unmaps the shard. The drained
725 /// events come back to us as a `Vec`.
726 /// 3. Hand those events directly to the adapter as a
727 /// single-shot batch — bypassing the per-shard pipeline
728 /// that's already being torn down — so they reach durable
729 /// storage.
730 async fn remove_shard_internal(&self, shard_id: u16) -> Result<(), AdapterError> {
731 // Step 1: drop the bus-side sender. The drain worker still
732 // has its own clone and will keep draining; we want it to
733 // exit when its `with_shard` call returns `None` after
734 // step 2's unmap.
735 self.batch_senders.write().remove(&shard_id);
736
737 // Step 2: atomically drain whatever's in the ring buffer and
738 // unmap. After this, `with_shard(shard_id)` returns `None`
739 // and the drain worker exits at its next poll.
740 let stranded = self
741 .shard_manager
742 .remove_shard(shard_id)
743 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
744
745 // Step 3: take ownership of the worker handles (move them
746 // OUT of the mutex map so we can `await` them — `await`
747 // consumes a `JoinHandle`). With the bus-side sender already
748 // dropped (step 1) and the shard unmapped (step 2), the
749 // drain worker exits at its next poll and drops its sender
750 // clone, which closes the BatchWorker's `rx`. The
751 // BatchWorker then flushes any pending `current_batch` and
752 // any events still buffered in the channel, dispatches them
753 // via the standard `dispatch_batch` path with their PROPER
754 // `next_sequence` values, and exits.
755 //
756 // Await order: drain first, then batch. The drain worker's
757 // `Some(N>0)` arm `mem::replace`s scratch into a batch and
758 // `sender.send(batch).await`s it; that send must complete
759 // (or fail) before drain breaks. The batch worker's
760 // `Ok(None)` arm runs after the sender drops and flushes
761 // any pending batch. Awaiting in the reverse order would
762 // park here forever: the batch worker's `recv()` only
763 // returns `None` once every sender clone (including the
764 // drain worker's) has dropped.
765 //
766 // Both awaits are bounded by `2 × adapter_timeout` so a
767 // worker parked inside a slow adapter call cannot pin
768 // teardown indefinitely. A timeout leaks the JoinHandle —
769 // acceptable because step 1 already removed the bus-side
770 // sender and step 2 unmapped the shard, so the detached
771 // task can't observe new work and will exit on its next
772 // loop iteration.
773 let workers = self.batch_workers.lock().remove(&shard_id);
774 let final_next_sequence = if let Some(workers) = workers {
775 let bound = self.config.adapter_timeout.saturating_mul(2);
776 match tokio::time::timeout(bound, workers.drain).await {
777 Ok(Ok(())) => {}
778 Ok(Err(e)) => {
779 tracing::warn!(
780 shard_id,
781 error = %e,
782 "drain worker JoinHandle errored on await; \
783 drain worker should have already exited via \
784 `with_shard -> None`",
785 );
786 }
787 Err(_) => {
788 tracing::warn!(
789 shard_id,
790 timeout_ms = bound.as_millis() as u64,
791 "drain worker did not exit within timeout on remove_shard; detaching",
792 );
793 }
794 }
795 match tokio::time::timeout(bound, workers.batch).await {
796 Ok(Ok(())) => {}
797 Ok(Err(e)) => {
798 tracing::warn!(
799 shard_id,
800 error = %e,
801 "BatchWorker JoinHandle errored on await; \
802 proceeding with stranded-flush using last \
803 published next_sequence",
804 );
805 }
806 Err(_) => {
807 tracing::warn!(
808 shard_id,
809 timeout_ms = bound.as_millis() as u64,
810 "BatchWorker did not exit within timeout on remove_shard; detaching",
811 );
812 }
813 }
814 workers.next_sequence.load(AtomicOrdering::Acquire)
815 } else {
816 // No worker registered for this shard — shouldn't
817 // happen on the normal scale-down path, but defensively
818 // fall back to 0. This branch only activates if a
819 // caller manages to call `remove_shard_internal` for a
820 // shard that was never spawned (or already removed).
821 0
822 };
823
824 // Step 4: flush the stranded ring-buffer events through the
825 // adapter in one shot, using `final_next_sequence` (NOT 0)
826 // as the `sequence_start`. The stranded batch's msg-ids
827 // are `{nonce}:{shard_id}:{final_next_sequence}:{i}` —
828 // strictly past every msg-id the worker emitted. Using 0
829 // would collide with the worker's very first batch
830 // (`{nonce}:{shard_id}:0:{i}`), and JetStream's 2 min dedup
831 // window would silently drop the duplicate.
832 if !stranded.is_empty() {
833 let count = stranded.len();
834 // Use the bus's loaded producer nonce so the stranded
835 // batch's msg-ids share the same producer-identity
836 // segment as everything else this bus has emitted —
837 // critical for cross-process dedup.
838 let batch = crate::event::Batch::with_nonce(
839 shard_id,
840 stranded,
841 final_next_sequence,
842 self.producer_nonce,
843 );
844 let dispatched = dispatch_batch(
845 &*self.adapter,
846 Arc::new(batch),
847 shard_id,
848 self.config.adapter_timeout,
849 self.config.adapter_batch_retries,
850 )
851 .await;
852 if dispatched {
853 self.stats
854 .batches_dispatched
855 .fetch_add(1, AtomicOrdering::Relaxed);
856 self.stats
857 .events_dispatched
858 .fetch_add(count as u64, AtomicOrdering::Relaxed);
859 tracing::info!(
860 shard_id,
861 count,
862 sequence_start = final_next_sequence,
863 "Removed shard: flushed stranded ring-buffer events to adapter"
864 );
865 } else {
866 tracing::error!(
867 shard_id,
868 count,
869 sequence_start = final_next_sequence,
870 "Removed shard: adapter rejected stranded ring-buffer events; \
871 events lost"
872 );
873 }
874 }
875
876 // Update poll merger with the post-remove id set.
877 // Without this, a default-shards poll (`request.shards == None`)
878 // would still iterate `0..num_shards` and skip the live shard
879 // whose id is now the largest, while polling a nonexistent /
880 // recreated shard at the bottom of the range.
881 //
882 // `poll_merger_swap_lock` serializes against
883 // `add_shard_internal`'s matching block — see the field doc.
884 {
885 let _swap_guard = self.poll_merger_swap_lock.lock();
886 self.poll_merger.store(Arc::new(PollMerger::new(
887 self.adapter.clone(),
888 self.shard_manager.shard_ids(),
889 )));
890 }
891
892 tracing::info!(shard_id = shard_id, "Removed shard");
893 Ok(())
894 }
895
896 /// Try to enter an ingest critical section. Returns `None` if
897 /// shutdown is in progress, in which case the caller must
898 /// return `IngestionError::ShuttingDown` without touching the
899 /// shard manager.
900 ///
901 /// The `fetch_add` + load(`shutdown`) sequence pairs with
902 /// `shutdown()`'s store(`shutdown=true`) + wait-for-zero on
903 /// `in_flight_ingests`. SeqCst on both sides closes the
904 /// stranding race: every ingest that is observed as in-flight
905 /// during shutdown's wait is guaranteed to complete before the
906 /// drain workers do their final ring-buffer sweep, so no event
907 /// can land in a ring buffer after the drain worker has stopped
908 /// reading from it.
909 #[inline(always)]
910 fn try_enter_ingest(&self) -> Option<IngestGuard<'_>> {
911 self.in_flight_ingests.fetch_add(1, AtomicOrdering::SeqCst);
912 if self.shutdown.load(AtomicOrdering::SeqCst) {
913 self.in_flight_ingests.fetch_sub(1, AtomicOrdering::SeqCst);
914 return None;
915 }
916 Some(IngestGuard { bus: self })
917 }
918
919 /// Ingest an event.
920 ///
921 /// This is a non-blocking operation. The event is added to the appropriate
922 /// shard's ring buffer and will be batched and persisted asynchronously.
923 ///
924 /// # Returns
925 ///
926 /// The shard ID and insertion timestamp on success.
927 #[inline]
928 pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)> {
929 let _g = self
930 .try_enter_ingest()
931 .ok_or(IngestionError::ShuttingDown)?;
932
933 match self.shard_manager.ingest(event.into_inner()) {
934 Ok((shard_id, ts)) => {
935 self.stats
936 .events_ingested
937 .fetch_add(1, AtomicOrdering::Relaxed);
938 Ok((shard_id, ts))
939 }
940 Err(e) => {
941 self.stats
942 .events_dropped
943 .fetch_add(1, AtomicOrdering::Relaxed);
944 Err(e)
945 }
946 }
947 }
948
949 /// Ingest a raw event (pre-serialized with cached hash).
950 ///
951 /// This is the fastest ingestion path:
952 /// - Uses pre-computed hash for shard selection (no serialization)
953 /// - Stores bytes directly (no clone needed, reference-counted)
954 ///
955 /// # Returns
956 ///
957 /// The shard ID and insertion timestamp on success.
958 #[inline]
959 pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)> {
960 let _g = self
961 .try_enter_ingest()
962 .ok_or(IngestionError::ShuttingDown)?;
963
964 match self.shard_manager.ingest_raw(event) {
965 Ok((shard_id, ts)) => {
966 self.stats
967 .events_ingested
968 .fetch_add(1, AtomicOrdering::Relaxed);
969 Ok((shard_id, ts))
970 }
971 Err(e) => {
972 self.stats
973 .events_dropped
974 .fetch_add(1, AtomicOrdering::Relaxed);
975 Err(e)
976 }
977 }
978 }
979
980 /// Ingest a batch of events.
981 ///
982 /// This is more efficient than calling `ingest` repeatedly: events
983 /// destined for the same shard share a single mutex acquisition.
984 ///
985 /// # Returns
986 ///
987 /// The number of successfully ingested events.
988 pub fn ingest_batch(&self, events: Vec<Event>) -> usize {
989 // The shutdown gate lives in `ingest_raw_batch`, which we
990 // forward to. No separate guard here — that would double-
991 // count `in_flight_ingests` (once for this call, once for
992 // the inner call) and could deadlock shutdown under high
993 // contention.
994 let raw: Vec<RawEvent> = events.into_iter().map(|e| e.into_raw()).collect();
995 self.ingest_raw_batch(raw)
996 }
997
998 /// Ingest a batch of raw events (fastest batch ingestion).
999 ///
1000 /// Groups events by their destination shard and pushes each group
1001 /// under a single mutex acquisition.
1002 ///
1003 /// # Returns
1004 ///
1005 /// The number of successfully ingested events.
1006 pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize {
1007 let _g = match self.try_enter_ingest() {
1008 Some(g) => g,
1009 None => return 0,
1010 };
1011
1012 let total = events.len();
1013 let (success, unrouted) = self.shard_manager.ingest_raw_batch(events);
1014 if success > 0 {
1015 self.stats
1016 .events_ingested
1017 .fetch_add(success as u64, AtomicOrdering::Relaxed);
1018 }
1019 // Subtract `unrouted` from the buffer-fullness drop count
1020 // so the same event isn't tallied in both `events_unrouted`
1021 // (bumped inside `ShardManager::ingest_raw_batch`) and
1022 // `events_dropped` — using a plain `total - success` here
1023 // would double-count unrouted events. Backpressure drops
1024 // = events that reached a shard but failed to push.
1025 let dropped = total.saturating_sub(success).saturating_sub(unrouted);
1026 if dropped > 0 {
1027 self.stats
1028 .events_dropped
1029 .fetch_add(dropped as u64, AtomicOrdering::Relaxed);
1030 }
1031 success
1032 }
1033
1034 /// Poll events from the bus.
1035 ///
1036 /// This retrieves events from storage according to the request parameters.
1037 ///
1038 /// # Topology-change visibility
1039 ///
1040 /// `ArcSwap::load()` snapshots the current `PollMerger` for
1041 /// the duration of this call. A concurrent `add_shard` /
1042 /// `remove_shard_internal` that `.store()`s a fresh merger
1043 /// only affects **subsequent** polls — this poll continues
1044 /// against the loaded snapshot's shard list.
1045 ///
1046 /// The implications:
1047 /// - **add_shard mid-poll:** events ingested into the new
1048 /// shard between the merger swap and our return are
1049 /// invisible to this call. They appear on the next poll.
1050 /// - **remove_shard_internal mid-poll:** the stale merger
1051 /// still has the removed shard in its id list. Adapters
1052 /// that lazy-create streams on `poll_shard` (JetStream
1053 /// in particular) may recreate the stream and return
1054 /// empty/stale data. The drained events are dispatched
1055 /// to durable storage by `remove_shard_internal` itself
1056 /// before this poll's adapter call lands; the next poll
1057 /// loads the new merger and sees the correct shard set.
1058 ///
1059 /// In both cases the loss is transient and self-healing:
1060 /// pagination via `next_id` and the next poll's cursor pick
1061 /// up where we left off. Callers requiring strict
1062 /// "topology-stable" semantics should serialize their polls
1063 /// against scaling operations externally.
1064 pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
1065 let merger = self.poll_merger.load();
1066 merger.poll(request).await
1067 }
1068
1069 /// Get the number of shards.
1070 pub fn num_shards(&self) -> u16 {
1071 self.shard_manager.num_shards()
1072 }
1073
1074 /// Get the adapter name.
1075 pub fn adapter_name(&self) -> &'static str {
1076 self.adapter.name()
1077 }
1078
1079 /// Check if the adapter is healthy.
1080 pub async fn is_healthy(&self) -> bool {
1081 self.adapter.is_healthy().await
1082 }
1083
1084 /// Get statistics.
1085 pub fn stats(&self) -> &EventBusStats {
1086 &self.stats
1087 }
1088
1089 /// Get shard statistics.
1090 pub fn shard_stats(&self) -> crate::shard::ShardStats {
1091 self.shard_manager.stats()
1092 }
1093
1094 /// Sum of `len()` across every shard's ring buffer.
1095 ///
1096 /// Mainly useful in tests and operational diagnostics: a
1097 /// non-zero value at the time of `Drop` (without an awaited
1098 /// `shutdown()`) would be silently lost, so `Drop` folds this
1099 /// into `events_dropped` before the bus disappears.
1100 pub fn pending_in_rings(&self) -> u64 {
1101 self.shard_manager.total_pending_in_rings()
1102 }
1103
1104 /// Flush all pending batches.
1105 ///
1106 /// Waits for all shard ring buffers to drain, then for the
1107 /// per-shard mpsc channels to drain, then for any pending batch
1108 /// inside each batch worker to time out and dispatch — and only
1109 /// then calls `adapter.flush()`.
1110 ///
1111 /// # Latency bound
1112 ///
1113 /// The total wall-clock budget is the sum of three phases:
1114 /// * Phase 1 (ring-buffer drain): up to **5 s**.
1115 /// * Phase 2 (channel + pending-batch drain): up to
1116 /// `min(2 s, batch.max_delay × n_workers)` — capped at 2 s
1117 /// so a misconfigured `max_delay` cannot inflate the budget.
1118 /// * Phase 3 (`adapter.flush()` call): up to `adapter_timeout`
1119 /// (default **30 s**).
1120 ///
1121 /// Worst-case `flush()` runtime is therefore **~37 s under
1122 /// default config**, NOT 5 s as an earlier doc-comment stated.
1123 /// Callers wiring `flush()` into request-path latencies (HTTP
1124 /// handler, RPC) MUST set `adapter_timeout` accordingly or run
1125 /// the flush under their own outer timeout. The 5-second figure
1126 /// describes Phase 1 only; the doc was misleading and is fixed
1127 /// here.
1128 ///
1129 /// The previous implementation slept a single `batch.max_delay`
1130 /// (default 10 ms) after the ring buffers drained and immediately
1131 /// called `adapter.flush()`. Events still in transit through the
1132 /// per-shard mpsc channel, the batch worker's pending batch, or
1133 /// the in-progress `adapter.on_batch` call (bounded only by
1134 /// `adapter_timeout`, default 30 s) could miss the flush. Callers
1135 /// using `flush()` as a delivery barrier silently lost events.
1136 pub async fn flush(&self) -> Result<(), AdapterError> {
1137 let start = tokio::time::Instant::now();
1138 let timeout = Duration::from_secs(5);
1139 let mut backoff = Duration::from_micros(100);
1140
1141 // Phase 1: wait for ring buffers to drain (drain workers
1142 // pump them into the per-shard mpsc channels).
1143 loop {
1144 if self.shard_manager.all_shards_empty() {
1145 break;
1146 }
1147 if start.elapsed() >= timeout {
1148 tracing::warn!("flush: ring buffers not fully drained after {:?}", timeout);
1149 break;
1150 }
1151 tokio::time::sleep(backoff).await;
1152 backoff = (backoff * 2).min(Duration::from_millis(10));
1153 }
1154
1155 // Phase 2: wait until every event ingested before flush()
1156 // started has been handed to the adapter via `on_batch`.
1157 // Snapshot the `events_ingested` counter at flush entry —
1158 // that's our target. We then poll `events_dispatched` (sum
1159 // of batch lengths from successful adapter dispatches) plus
1160 // `events_dropped` (events the adapter rejected after retry
1161 // exhaustion or that never made it past backpressure). The
1162 // barrier is met when `events_dispatched + events_dropped >=
1163 // target`: every pre-flush ingest is accounted for in one
1164 // bucket or the other.
1165 //
1166 // This is a true delivery barrier — it doesn't rely on
1167 // "no progress this window" heuristics that race a
1168 // BatchWorker whose `batch_start` was set just before
1169 // flush() ran. A progress gate that reads
1170 // `batches_dispatched` only works if that counter is
1171 // actually incremented on every dispatch, and Windows
1172 // timer resolution alone has historically made any
1173 // single-`max_delay`-sleep approach a frequent flake.
1174 let target_ingested = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1175 let dropped_at_start = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1176 let dispatched_at_start = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1177
1178 // Outer deadline still bounds Phase 2 in case a wedged
1179 // adapter never returns. `max_delay * num_workers` is the
1180 // worst-case shape (one partially-filled batch per worker,
1181 // each waiting its full `max_delay` to time out), capped at
1182 // 2 s — same upper bound as before.
1183 //
1184 // Read the worker count via the shard manager's atomic-
1185 // backed `num_shards()` rather than `batch_workers.lock()
1186 // .len()`. The previous spinlock-backed `.lock()` inside
1187 // an `async fn` could stall the runtime worker thread
1188 // under contention with concurrent `add_shard_internal` /
1189 // `remove_shard_internal` callers; the atomic accessor is
1190 // both faster and async-safe. Mismatch in the
1191 // worker-count vs shard-count snapshot only changes the
1192 // phase2 deadline by at most one `max_delay` step, which
1193 // is bounded by the outer 2s cap regardless.
1194 let n_workers = usize::from(self.shard_manager.num_shards());
1195 let phase2_budget = self
1196 .config
1197 .batch
1198 .max_delay
1199 .saturating_mul(n_workers.max(1) as u32);
1200 let phase2_deadline =
1201 tokio::time::Instant::now() + phase2_budget.min(Duration::from_secs(2));
1202
1203 // Inner poll cadence: re-check the counters every 1 ms (or
1204 // `max_delay / 16`, whichever is larger). The fast cadence
1205 // means we exit promptly once the BatchWorker dispatches,
1206 // rather than waking exactly once per `max_delay` and
1207 // potentially racing the dispatch by a few ms.
1208 let poll_interval = (self.config.batch.max_delay / 16).max(Duration::from_millis(1));
1209 loop {
1210 let dispatched = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1211 let dropped = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1212 // The barrier: every event ingested pre-flush has been
1213 // either dispatched or dropped. The bus's invariant
1214 // `events_ingested = events_dispatched + events_dropped`
1215 // holds at quiescence; we wait until
1216 // `dispatched + dropped >= target_ingested`.
1217 //
1218 // Pre-fix this used `dispatched + (dropped -
1219 // dropped_at_start) >= target_ingested`, which under-
1220 // counted by `dropped_at_start`: even after every
1221 // pre-flush event was processed, the inequality
1222 // required `dropped_at_start` MORE post-flush events
1223 // before signalling done. Workloads with no post-flush
1224 // ingest hung at the barrier until the deadline fired.
1225 //
1226 // Cross-shard race remains: a fast shard's post-flush
1227 // dispatches can satisfy the global target while a
1228 // slow shard's pre-flush events linger in its mpsc
1229 // channel or pending batch. `all_shards_empty()`
1230 // checks ring buffers but not those downstream
1231 // queues. Operators relying on flush as a hard
1232 // delivery barrier should call it during quiet
1233 // ingest, or in `shutdown` (which gates ingest via
1234 // `try_enter_ingest`).
1235 let _ = dispatched_at_start; // reserved for future per-shard accounting
1236 if dispatched.saturating_add(dropped) >= target_ingested
1237 && self.shard_manager.all_shards_empty()
1238 {
1239 break;
1240 }
1241 if tokio::time::Instant::now() >= phase2_deadline {
1242 tracing::warn!(
1243 target = target_ingested,
1244 dispatched,
1245 dropped = dropped.saturating_sub(dropped_at_start),
1246 "flush: Phase 2 deadline reached before all ingested events were dispatched",
1247 );
1248 break;
1249 }
1250 tokio::time::sleep(poll_interval).await;
1251 }
1252
1253 // Phase 3: tell the adapter to flush whatever it has
1254 // buffered. Bounded by `adapter_timeout` so a hanging
1255 // adapter can't pin us forever.
1256 match tokio::time::timeout(self.config.adapter_timeout, self.adapter.flush()).await {
1257 Ok(r) => r,
1258 Err(_) => {
1259 tracing::warn!(
1260 "flush: adapter.flush timed out after {:?}",
1261 self.config.adapter_timeout
1262 );
1263 Err(AdapterError::Fatal("adapter flush timed out".into()))
1264 }
1265 }
1266 }
1267
1268 /// Gracefully shut down the event bus.
1269 ///
1270 /// The shutdown order is load-bearing:
1271 ///
1272 /// 1. Signal `shutdown` so drain workers stop pulling from
1273 /// ring buffers after their final sweep.
1274 /// 2. Await **drain workers** so every event the producer
1275 /// has handed to the bus is now in the per-shard mpsc
1276 /// channel.
1277 /// 3. Drop `batch_senders` so each channel's last sender is
1278 /// gone — the next `recv().await` in a batch worker will
1279 /// return `None`.
1280 /// 4. Await **batch workers**, which drain everything
1281 /// remaining in their channel and exit on `recv() = None`.
1282 ///
1283 /// Reversing steps 2 and 4 (the previous design) silently
1284 /// dropped events: a batch worker that exited on the shutdown
1285 /// flag could leave events the drain worker pushed *after* its
1286 /// `try_recv` sweep stranded in the channel.
1287 pub async fn shutdown(self) -> Result<(), AdapterError> {
1288 self.shutdown_via_ref().await
1289 }
1290
1291 /// Shutdown via shared reference — same semantics as
1292 /// [`shutdown`](Self::shutdown), but does not consume `self`.
1293 ///
1294 /// Useful for callers that hold the bus behind `Arc<EventBus>`
1295 /// (e.g., the SDK, where `subscribe` perpetuates an Arc clone
1296 /// into every `EventStream`) and therefore cannot satisfy
1297 /// `Arc::try_unwrap`. Idempotent: the first caller does the
1298 /// work; concurrent or subsequent callers wait for the
1299 /// `shutdown_completed` flag and return `Ok(())`.
1300 pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError> {
1301 // 1. CAS the shutdown flag false→true. SeqCst pairs with
1302 // `try_enter_ingest`'s shutdown check — any producer that
1303 // observed the previous `false` and is mid-push has its
1304 // `in_flight_ingests` increment ordered before this store
1305 // (the CAS-success branch is a release of the new `true`),
1306 // and so will be visible to the wait below.
1307 //
1308 // If the CAS loses (someone else — typically a concurrent
1309 // call or `Drop` — already flipped the flag), spin until
1310 // they finish. We can't run the rest of the body because
1311 // workers/senders may already be partially torn down.
1312 if self
1313 .shutdown
1314 .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
1315 .is_err()
1316 {
1317 // Bound the wait so a `Drop`-only path (which sets
1318 // `shutdown=true` but never sets `shutdown_completed`)
1319 // doesn't spin forever.
1320 //
1321 // Distinguish the two outcomes for callers. If
1322 // `shutdown_completed` flips inside the window, we
1323 // return `Ok(())` and the caller can be sure the first
1324 // caller finished. If the deadline fires first, we
1325 // surface `AdapterError::Transient(_)` — the bus IS
1326 // being shut down (the flag is set), but completion is
1327 // not yet observable; the caller can treat this as
1328 // "another thread is working on it, retry the
1329 // is_shutdown_completed() poll if you need a hard
1330 // barrier."
1331 //
1332 // Returning `Ok(())` in both branches would let
1333 // shutdown-done assumptions silently drift under a slow
1334 // adapter (`adapter_timeout` default 30 s > the 10 s
1335 // spin deadline), letting subsequent code observe a
1336 // partially-shut-down bus.
1337 // 10s in production builds; overridable via the
1338 // `_TEST_OVERRIDE_SHUTDOWN_VIA_REF_DEADLINE` thread-local
1339 // in test builds so the slow-first-caller test doesn't
1340 // need to wall-clock-wait for the full deadline. The
1341 // override is `#[cfg(test)]`-only; production cargo
1342 // builds compile out the override entirely.
1343 let deadline_dur = shutdown_via_ref_spin_deadline();
1344 // Use `tokio::time::Instant` so tests using
1345 // `tokio::time::pause()` virtualize this clock too.
1346 // Pre-fix `std::time::Instant` was wall-clock and
1347 // ignored `pause()`, breaking timeout-bounded tests
1348 // that wanted to fast-forward the spin deadline.
1349 let deadline = tokio::time::Instant::now() + deadline_dur;
1350 while !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1351 if tokio::time::Instant::now() >= deadline {
1352 return Err(AdapterError::Transient(
1353 "shutdown_via_ref: another caller is mid-shutdown; \
1354 deadline elapsed before shutdown_completed \
1355 flipped. The bus IS shutting down; poll \
1356 is_shutdown_completed() if you need a hard \
1357 barrier."
1358 .into(),
1359 ));
1360 }
1361 // `yield_now` re-queues immediately and keeps the
1362 // task hot, starving the workers we're waiting on
1363 // under contention. A short `sleep` parks the task
1364 // and lets the runtime schedule the workers.
1365 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1366 }
1367 return Ok(());
1368 }
1369
1370 // 1a. Wait for in-flight ingests to drain BEFORE the drain
1371 // workers do their final ring-buffer sweep. Otherwise a
1372 // producer that observed `shutdown=false` could push *after*
1373 // the drain worker's last `pop_batch_into` returned zero,
1374 // leaving the event stranded in the ring buffer when the bus
1375 // is dropped.
1376 //
1377 // This is bounded: every producer either bails on the
1378 // SeqCst-synchronized shutdown check (no progress past the
1379 // increment) or completes its single non-blocking push and
1380 // decrements. Both paths take constant time; the total
1381 // wait is O(producer threads).
1382 //
1383 // The "every observed in-flight ingest completes before
1384 // the final sweep" property holds under normal conditions,
1385 // but the 5-second deadline below forces the gate open
1386 // even when producers are still in their push window. A
1387 // producer that has incremented `in_flight_ingests` (and
1388 // so observed `shutdown=false`) but whose push is delayed
1389 // past the deadline (heavy contention, debugger hit, etc.)
1390 // will complete its push AFTER the final sweep — its event
1391 // lands in the ring buffer and is never read. The deadline
1392 // exists so a stuck producer can't deadlock shutdown
1393 // indefinitely; the trade-off is documented data loss past
1394 // the 5 s window, surfaced via the `events_dropped` stat
1395 // (so the loss is observable to operators) and the `WARN`
1396 // log below (so it's diagnosable). The "no stranding"
1397 // promise on the happy path stands; the deadline path is
1398 // the documented escape hatch.
1399 // Use `tokio::time::Instant` so tests using
1400 // `tokio::time::pause()` virtualize this 5-second
1401 // deadline too — pre-fix the `std::time::Instant`
1402 // was wall-clock and ignored the test's paused clock.
1403 let in_flight_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1404 // Snapshot of `(stranded, ingested, dispatched)` at the
1405 // deadline — Some(...) iff we hit the deadline path. The
1406 // post-drain reconciliation reads this to compute the
1407 // actual drop count (see comment further down).
1408 let mut deadline_snapshot: Option<(u64, u64, u64)> = None;
1409 while self.in_flight_ingests.load(AtomicOrdering::SeqCst) > 0 {
1410 if tokio::time::Instant::now() >= in_flight_deadline {
1411 let stranded = self.in_flight_ingests.load(AtomicOrdering::SeqCst);
1412 let ingested_now = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1413 let dispatched_now = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1414 tracing::warn!(
1415 in_flight = stranded,
1416 lossy = true,
1417 "shutdown timed out waiting for in-flight ingests after 5s; \
1418 proceeding — up to {} events may strand in the ring buffer \
1419 past final drain (documented data-loss path)",
1420 stranded,
1421 );
1422 // Set the lossy flag immediately so a fast `is_*`
1423 // poll observes the outcome before the drain
1424 // finishes. The actual `events_dropped` bump is
1425 // deferred until after the final drain runs (see
1426 // "post-drain reconciliation" below) so we don't
1427 // double-count events that the drain still
1428 // successfully delivers — pre-fix this bumped
1429 // `events_dropped += stranded` here and the same
1430 // events that the final sweep then drained landed
1431 // in BOTH `events_ingested` and `events_dropped`,
1432 // breaking the bus's
1433 // `ingested == dispatched + dropped` invariant
1434 // and turning `shutdown_was_lossy` into a false
1435 // positive on every deadline-triggered shutdown.
1436 self.stats
1437 .shutdown_was_lossy
1438 .store(true, AtomicOrdering::Release);
1439 deadline_snapshot = Some((stranded, ingested_now, dispatched_now));
1440 break;
1441 }
1442 // Park instead of `yield_now`. The producers we're
1443 // waiting on contend for the same runtime threads;
1444 // re-queuing immediately starves their progress.
1445 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1446 }
1447
1448 // 1b. Release the drain-finalize gate.
1449 //
1450 // Pre-fix used `Ordering::Release` for this store
1451 // and relied on the SeqCst spin above (loading
1452 // `in_flight_ingests`) to provide the happens-before for
1453 // every observed-pre-shutdown push. That works today
1454 // because SeqCst loads carry an implicit fence, but a
1455 // future change to the spin's ordering (Relaxed for perf,
1456 // say) would silently break the drain worker's final-sweep
1457 // contract — producer pushes might not be visible to
1458 // `pop_batch_into`. Promote to SeqCst so the load-bearing
1459 // happens-before is explicit at this site, not derived
1460 // from another atomic's ordering choice.
1461 //
1462 // Caveat: the SeqCst happens-before above only covers the
1463 // *non-deadline* exit from the spin (the loop condition
1464 // observed `in_flight_ingests == 0`). On the deadline-break
1465 // path the loop exits with `in_flight_ingests > 0` — the
1466 // outstanding producer pushes have NOT been observed
1467 // synchronized-with this thread, and the SeqCst store below
1468 // does not retroactively create a happens-before with
1469 // pushes that are still mid-flight. Those events are
1470 // exactly the "stranded" events accounted via
1471 // `events_dropped` and `shutdown_was_lossy`; the contract
1472 // is "every observed-pre-shutdown push is visible to the
1473 // final sweep on the happy path; on the deadline path,
1474 // up to `stranded` events past the gate are surfaced as
1475 // dropped". The flag at line 1271 (`shutdown_was_lossy`)
1476 // is the operator-visible signal that this contract was
1477 // exercised on the lossy branch.
1478 self.drain_finalize_ready
1479 .store(true, AtomicOrdering::SeqCst);
1480
1481 // Stop the scaling monitor first — it's independent of the
1482 // ingestion path and just needs to observe the flag.
1483 let scaling_handle = self.scaling_monitor.lock().take();
1484 if let Some(handle) = scaling_handle {
1485 let _ = handle.await;
1486 }
1487
1488 // Take workers without holding the lock across await.
1489 let workers = std::mem::take(&mut *self.batch_workers.lock());
1490
1491 // 2. Await drain workers. Each one pops a final batch (up
1492 // to 10k events) from its ring buffer, sends it on the
1493 // channel, and exits. After this loop, every event in
1494 // the ring buffers has been pushed to its channel.
1495 //
1496 // `join_all` lets the runtime overlap drain handles. A
1497 // sequential `for ... { drain.await; }` would serialize
1498 // shutdown wall-clock as N×T instead of max(T), which on
1499 // the default 1024-shard config × per-shard final-drain
1500 // time makes shutdown painful.
1501 let (drains, batch_handles): (Vec<_>, Vec<_>) = workers
1502 .into_iter()
1503 .map(|(shard_id, ShardWorkers { batch, drain, .. })| {
1504 ((shard_id, drain), (shard_id, batch))
1505 })
1506 .unzip();
1507
1508 // Surface drain-worker JoinErrors explicitly. The default
1509 // Tokio runtime does NOT log spawned-task panics, so a
1510 // `let _ = join_all(...)` would silently swallow a panic
1511 // and mask stranded events. tracing::error per failure
1512 // makes the incident grep-able post-mortem.
1513 let drain_handles: Vec<_> = drains.into_iter().map(|(_, h)| h).collect();
1514 let drain_ids: Vec<u16> = batch_handles.iter().map(|(id, _)| *id).collect();
1515 for (shard_id, result) in drain_ids
1516 .iter()
1517 .copied()
1518 .zip(futures::future::join_all(drain_handles).await)
1519 {
1520 if let Err(e) = result {
1521 tracing::error!(
1522 shard_id,
1523 error = %e,
1524 "drain worker JoinHandle errored on shutdown await"
1525 );
1526 }
1527 }
1528
1529 // 3. Drop the original senders so the channels close once
1530 // drain-worker sender clones (already dropped above)
1531 // are gone too. Without this, batch workers would block
1532 // on `recv().await` forever.
1533 drop(std::mem::take(&mut *self.batch_senders.write()));
1534
1535 // 4. Await batch workers. They drain their channel until
1536 // `recv() = None`, flush, and exit.
1537 //
1538 // Same parallelization as the drain phase, with the same
1539 // explicit JoinError surfacing.
1540 let batch_only: Vec<_> = batch_handles.into_iter().map(|(_, h)| h).collect();
1541 for (shard_id, result) in drain_ids
1542 .into_iter()
1543 .zip(futures::future::join_all(batch_only).await)
1544 {
1545 if let Err(e) = result {
1546 tracing::error!(
1547 shard_id,
1548 error = %e,
1549 "BatchWorker JoinHandle errored on shutdown await"
1550 );
1551 }
1552 }
1553
1554 // Flush and shutdown adapter (with timeout to prevent hanging)
1555 let timeout = self.config.adapter_timeout;
1556 if tokio::time::timeout(timeout, self.adapter.flush())
1557 .await
1558 .is_err()
1559 {
1560 tracing::error!("Adapter flush timed out during shutdown");
1561 }
1562 let result = tokio::time::timeout(timeout, self.adapter.shutdown())
1563 .await
1564 .map_err(|_| AdapterError::Fatal("adapter shutdown timed out".into()))?;
1565
1566 // Post-drain reconciliation for the lossy-shutdown path.
1567 //
1568 // If we hit the in-flight deadline above, `deadline_snapshot`
1569 // holds `(stranded, ingested@deadline, dispatched@deadline)`.
1570 // Some of those `stranded` producers' events landed in the
1571 // ring AFTER our deadline check but BEFORE the
1572 // `drain_finalize_ready` gate flipped — those events are
1573 // now successfully ingested, drained, and dispatched through
1574 // the adapter. They appear in `events_dispatched` (the
1575 // delta since the deadline), so:
1576 //
1577 // actual_drops = stranded
1578 // - (dispatched_after_drain - dispatched@deadline)
1579 // - (ingested_after_drain - ingested@deadline)
1580 // only counting events that landed but
1581 // weren't dispatched (dropped under
1582 // backpressure, etc.)
1583 //
1584 // The cleaner reconciliation: events that completed
1585 // `try_enter_ingest` AFTER the deadline either completed
1586 // ingest (bumping `events_ingested`) or were dropped on
1587 // backpressure (bumping `events_dropped` from the existing
1588 // backpressure paths). The `stranded - delta_ingested`
1589 // remainder is producers whose `try_enter_ingest` succeeded
1590 // but never reached `shard_manager.ingest()` — those are
1591 // the genuinely-lost events we should account for.
1592 if let Some((stranded, ingested_at_deadline, _dispatched_at_deadline)) = deadline_snapshot {
1593 let ingested_after = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1594 let post_deadline_ingests = ingested_after.saturating_sub(ingested_at_deadline);
1595 // Known under-count window: a producer that completed
1596 // `shard_manager.ingest()` and bumped `events_ingested`
1597 // just BEFORE its `IngestGuard` decremented
1598 // `in_flight_ingests` will be counted in
1599 // `ingested_at_deadline` (already past the bump) AND in
1600 // `stranded` (still pending the decrement on that same
1601 // spin). That single event contributes `+1` to both
1602 // sides of the subtraction, so the saturating_sub
1603 // under-counts the drop by 1 per such interleaving.
1604 // The opposite direction (producer in-flight at
1605 // deadline that never completed ingest) is correctly
1606 // counted as a drop, so the bias is one-sided and
1607 // small (bounded by the number of producers mid-push
1608 // at the exact deadline moment — typically 0–1 even
1609 // under heavy load). Operators reading
1610 // `shutdown_was_lossy` get the right boolean; the
1611 // numeric `events_dropped` may under-count by a few
1612 // events on a lossy shutdown. Acceptable; the cost of
1613 // closing the window is paired-SeqCst reloads under
1614 // the spin which would dominate the shutdown path.
1615 let actual_drops = stranded.saturating_sub(post_deadline_ingests);
1616 if actual_drops > 0 {
1617 self.stats
1618 .events_dropped
1619 .fetch_add(actual_drops, AtomicOrdering::Relaxed);
1620 } else {
1621 // The deadline tripped the eager
1622 // `shutdown_was_lossy = true` set above, but the
1623 // final drain ingested every stranded event so
1624 // nothing was actually lost. Clear the flag so
1625 // operator dashboards alerting on
1626 // `was_lossy && events_dropped == 0` don't see a
1627 // false positive. Pre-fix the boolean stayed
1628 // `true` against `events_dropped == 0` for any
1629 // deadline-triggered shutdown whose drain
1630 // happened to catch up.
1631 self.stats
1632 .shutdown_was_lossy
1633 .store(false, AtomicOrdering::Release);
1634 }
1635 tracing::warn!(
1636 stranded_at_deadline = stranded,
1637 post_deadline_ingests,
1638 actual_drops,
1639 "lossy shutdown reconciled: post-drain `events_dropped` bumped \
1640 by stranded - post-deadline-ingests (pre-fix this bumped by \
1641 the full `stranded` count, double-counting events the drain \
1642 still successfully delivered)",
1643 );
1644 }
1645
1646 // Mark shutdown as completed so Drop knows not to warn.
1647 self.shutdown_completed.store(true, AtomicOrdering::Release);
1648 result
1649 }
1650
1651 /// True once `shutdown` / `shutdown_via_ref` has signaled — does
1652 /// not imply the shutdown work has finished. Use
1653 /// [`is_shutdown_completed`](Self::is_shutdown_completed) for
1654 /// completion.
1655 pub fn is_shutdown(&self) -> bool {
1656 self.shutdown.load(AtomicOrdering::Acquire)
1657 }
1658
1659 /// True once `shutdown` / `shutdown_via_ref` has fully drained
1660 /// workers and the adapter shutdown returned (success path only).
1661 pub fn is_shutdown_completed(&self) -> bool {
1662 self.shutdown_completed.load(AtomicOrdering::Acquire)
1663 }
1664
1665 /// Get shard metrics (if dynamic scaling is enabled).
1666 pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>> {
1667 self.shard_manager.collect_metrics()
1668 }
1669
1670 /// Check if dynamic scaling is enabled.
1671 pub fn is_dynamic_scaling_enabled(&self) -> bool {
1672 self.config.scaling.is_some()
1673 }
1674
1675 /// Manually trigger a scale-up (for testing or manual intervention).
1676 ///
1677 /// Bypasses the auto-scaling cooldown so a deliberate operator
1678 /// request isn't rate-limited by the auto-scaling cadence.
1679 /// Pre-fix this looped `add_shard_internal()` N times, each
1680 /// of which bumped `last_scaling`, so iteration 1+ failed
1681 /// with `InCooldown` against any non-zero cooldown — the
1682 /// first shard was left half-added (workers spawned, routing
1683 /// entry installed) while the error propagated to the
1684 /// caller. The `max_shards` budget check still applies.
1685 pub async fn manual_scale_up(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1686 let mut new_ids = Vec::with_capacity(count as usize);
1687 for _ in 0..count {
1688 let id = self.add_shard_internal_force().await?;
1689 new_ids.push(id);
1690 }
1691 Ok(new_ids)
1692 }
1693
1694 /// Manually trigger a scale-down (for testing or manual intervention).
1695 ///
1696 /// Marks `count` shards as `Draining`, waits for them to empty,
1697 /// finalizes them to `Stopped`, and removes them from the
1698 /// routing table — mirroring the scaling monitor's per-tick
1699 /// finalize loop. Returns the IDs of shards that were
1700 /// successfully drained AND removed (subset of those marked
1701 /// Draining if any failed to empty within the deadline).
1702 ///
1703 /// Drives the full scale-down lifecycle synchronously: a
1704 /// plain `mapper.scale_down` call marks shards `Draining` but
1705 /// does NOT finalize them — finalization is the scaling
1706 /// monitor's responsibility. Bus configs without an active
1707 /// monitor (or callers that shut down before the monitor's
1708 /// next tick) would otherwise lose any events queued in those
1709 /// shards' ring buffers.
1710 pub async fn manual_scale_down(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1711 let mapper = self
1712 .shard_manager
1713 .mapper()
1714 .ok_or_else(|| AdapterError::Fatal("Dynamic scaling not enabled".into()))?;
1715
1716 let drained_ids = mapper
1717 .scale_down(count)
1718 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
1719
1720 // `finalize_draining` requires the shard to have been
1721 // Draining for >100ms with an empty ring buffer and no
1722 // pushes since drain start. Poll until every requested
1723 // shard finalizes, capped by an outer deadline so a wedged
1724 // producer can't pin this method forever. Use
1725 // `tokio::time::Instant` so tests under `tokio::time::pause()`
1726 // advance the virtual clock via `sleep` rather than spinning
1727 // until wall-clock catches up.
1728 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1729 let mut finalized: std::collections::HashSet<u16> = std::collections::HashSet::new();
1730 let target: std::collections::HashSet<u16> = drained_ids.iter().copied().collect();
1731
1732 while finalized.len() < target.len() && tokio::time::Instant::now() < deadline {
1733 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1734 let stopped = mapper.finalize_draining();
1735 // `finalize_draining` is destructive — every qualifying
1736 // Draining shard transitions to Stopped in one shot,
1737 // regardless of who initiated the drain. Pre-fix the
1738 // `if target.contains(&shard_id)` filter dropped non-
1739 // target ids on the floor; if the scaling monitor (or
1740 // a parallel `manual_scale_down` on a different target
1741 // set) finalized one of THEIR shards in the same tick,
1742 // that shard ended up Stopped with workers + routing
1743 // entry intact — leaked. Always tear down via
1744 // `remove_shard_internal` so the bus-side state
1745 // (workers, sender, routing) is freed; only count the
1746 // target subset toward the returned Vec.
1747 for shard_id in stopped {
1748 let _ = self.remove_shard_internal(shard_id).await;
1749 if target.contains(&shard_id) {
1750 finalized.insert(shard_id);
1751 }
1752 }
1753 }
1754
1755 // Surface partial success at WARN level. The return shape
1756 // is preserved for compat (changing to `(Vec, Vec)` would
1757 // break the existing callers + test) so a smaller-than-
1758 // target list could otherwise be silently mistaken for full
1759 // success; the WARN log gives operations tooling something
1760 // to alert on.
1761 if finalized.len() < target.len() {
1762 let still_draining: Vec<u16> = target.difference(&finalized).copied().collect();
1763 tracing::warn!(
1764 requested = target.len(),
1765 finalized = finalized.len(),
1766 still_draining = ?still_draining,
1767 "manual_scale_down deadline elapsed before all targeted \
1768 shards finalized — events still in-flight on the listed \
1769 shards. They will finalize on the next scaling-monitor \
1770 tick or on shutdown."
1771 );
1772 }
1773
1774 Ok(finalized.into_iter().collect())
1775 }
1776}
1777
1778impl Drop for EventBus {
1779 fn drop(&mut self) {
1780 // Signal shutdown so background tasks (drain workers, batch
1781 // workers, scaling monitor) observe the flag and exit. We
1782 // cannot await futures in Drop, but setting the atomic flag
1783 // triggers eventual termination.
1784 //
1785 // Previously used `Release` here while `try_enter_ingest`
1786 // and `shutdown()` use `SeqCst` on the same flag. `&mut self`
1787 // exclusion makes that sound today (no concurrent ingest can
1788 // observe a half-published shutdown). The mismatch is purely
1789 // defensive — switching to `SeqCst` matches the rest of the
1790 // lifecycle and removes a footgun if a future change ever
1791 // opened a path where `Drop` overlaps an in-flight
1792 // `try_enter_ingest`.
1793 self.shutdown.store(true, AtomicOrdering::SeqCst);
1794 // Also release the drain-finalize gate so any drain worker
1795 // already parked waiting for it can proceed and exit. Without
1796 // this, drop-without-shutdown leaves drain workers blocked on
1797 // `drain_finalize_ready` until their internal deadline fires
1798 // (which delays task cleanup by `DRAIN_FINALIZE_TIMEOUT`).
1799 // Best-effort durability: drop never gets the in-flight wait,
1800 // so any push that lands after this point is still lost.
1801 self.drain_finalize_ready
1802 .store(true, AtomicOrdering::SeqCst);
1803
1804 // Workers do NOT hold `Arc<EventBus>` — they hold
1805 // independent `Arc<ShardManager>` / `Arc<dyn Adapter>`
1806 // clones plus the channel halves. When `Drop` returns,
1807 // those Arcs survive in the still-running tasks and they
1808 // continue draining / dispatching until they observe the
1809 // shutdown flag we just set. There's no partial-Drop UB
1810 // risk: nothing on the worker side dereferences a
1811 // dropped EventBus field. The flags promote the
1812 // worker tasks from "blocked on recv / parked on
1813 // drain_finalize_ready" to "observe shutdown=true and
1814 // exit" so they don't linger indefinitely.
1815 //
1816 // If `shutdown()` was never awaited, any events still in the
1817 // per-shard ring buffers or mpsc channels are lost — the
1818 // adapter's `flush()` and `shutdown()` won't run, so durable
1819 // backends never see them. We can't fix that from `Drop` (no
1820 // async), but we *can* surface the data-loss risk loudly so
1821 // it doesn't hide. The check is bounded to "shutdown was
1822 // never started"; an in-progress shutdown is fine because the
1823 // call site is awaiting it.
1824 if !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1825 // Count events still sitting in shard ring buffers. They
1826 // are stranded — the drain workers will see `shutdown =
1827 // true` and exit without flushing, the adapter's
1828 // `flush()`/`shutdown()` never run, so anything in the
1829 // rings at this point is permanently lost. Surface that
1830 // loss via `events_dropped` so post-mortem stats reflect
1831 // reality (operators alerting on `events_dropped > 0`
1832 // would otherwise miss the entire incident), and set
1833 // `shutdown_was_lossy` so the boolean view is consistent
1834 // with the counter view.
1835 //
1836 // Events in the BatchWorker mpsc channels or pending
1837 // batches are not counted here — those workers may still
1838 // observe the shutdown flag and exit, but we have no
1839 // synchronous way from Drop to enumerate them. The ring-
1840 // buffer count is a lower bound on the stranded total.
1841 //
1842 // Use the non-blocking accessor: if `Drop` runs on a
1843 // thread that already holds a shard mutex (single-thread
1844 // runtime + panic during shutdown is the canonical
1845 // hazard), the blocking `total_pending_in_rings` would
1846 // self-deadlock. Best-effort accounting is the right
1847 // trade-off here — we'd rather under-report stranded
1848 // events than wedge the drop forever.
1849 let (stranded_in_rings, uncounted_shards) =
1850 self.shard_manager.try_total_pending_in_rings();
1851 if uncounted_shards > 0 {
1852 tracing::warn!(
1853 uncounted_shards,
1854 "EventBus::drop: {uncounted_shards} shard(s) were locked at \
1855 drop time and could not be accounted for in stranded_in_rings"
1856 );
1857 }
1858 if stranded_in_rings > 0 {
1859 self.stats
1860 .events_dropped
1861 .fetch_add(stranded_in_rings, AtomicOrdering::Relaxed);
1862 self.stats
1863 .shutdown_was_lossy
1864 .store(true, AtomicOrdering::Release);
1865 }
1866
1867 let stats = self.shard_manager.stats();
1868 tracing::warn!(
1869 events_ingested = stats.events_ingested,
1870 events_dropped = stats.events_dropped,
1871 stranded_in_rings,
1872 "EventBus dropped without an awaited shutdown(). Any in-flight \
1873 events still in the ring buffers or batch channels will be lost \
1874 — the adapter's flush()/shutdown() never ran. Call \
1875 `bus.shutdown().await` before dropping for durable shutdown."
1876 );
1877 }
1878 }
1879}
1880
1881/// Spin deadline for the second-caller path in
1882/// `shutdown_via_ref`. 10s in production.
1883#[cfg(not(test))]
1884fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1885 std::time::Duration::from_secs(10)
1886}
1887
1888/// Test-only override. Stored as milliseconds; `0` (the default)
1889/// means "use the production 10s". Set via
1890/// [`set_shutdown_via_ref_spin_deadline_for_test`] from inside a
1891/// test that needs to exercise the deadline-elapsed path without
1892/// wall-clock-waiting the full 10s.
1893///
1894/// This is a global atomic shared across all tests in the
1895/// `cargo test --lib` binary. If two tests touched it concurrently,
1896/// one's override would leak into the other's expectations. Tests
1897/// that use the override MUST take the
1898/// [`SHUTDOWN_DEADLINE_OVERRIDE_GUARD`] mutex for the duration of
1899/// their override-setter / read window — see
1900/// [`set_shutdown_via_ref_spin_deadline_for_test`].
1901#[cfg(test)]
1902static SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS: std::sync::atomic::AtomicU64 =
1903 std::sync::atomic::AtomicU64::new(0);
1904
1905/// Serializes access to the deadline override so concurrent tests
1906/// can't observe each other's transient values. Tests that override
1907/// the deadline take this mutex via
1908/// [`shutdown_deadline_override_lock`] and hold the guard until
1909/// they reset the override to 0.
1910///
1911/// Uses `tokio::sync::Mutex` rather than `std::sync::Mutex` so
1912/// the guard can legitimately be held across `.await` points
1913/// while the guarded test runs (clippy::await_holding_lock would
1914/// otherwise fire on the std variant — and rightly so for
1915/// production code).
1916#[cfg(test)]
1917static SHUTDOWN_DEADLINE_OVERRIDE_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
1918
1919/// Acquire the guard mutex protecting the deadline-override
1920/// static. Tests touching the override hold the returned guard
1921/// across both the set and the reset, so concurrent tests
1922/// observe a consistent default.
1923#[cfg(test)]
1924pub(crate) async fn shutdown_deadline_override_lock() -> tokio::sync::MutexGuard<'static, ()> {
1925 SHUTDOWN_DEADLINE_OVERRIDE_GUARD.lock().await
1926}
1927
1928#[cfg(test)]
1929fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1930 let ms = SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.load(std::sync::atomic::Ordering::Relaxed);
1931 if ms == 0 {
1932 std::time::Duration::from_secs(10)
1933 } else {
1934 std::time::Duration::from_millis(ms)
1935 }
1936}
1937
1938#[cfg(test)]
1939pub(crate) fn set_shutdown_via_ref_spin_deadline_for_test(ms: u64) {
1940 SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
1941}
1942
1943#[cfg(test)]
1944impl EventBus {
1945 /// Test-only: bump `in_flight_ingests` by `n` to simulate
1946 /// stranded producers mid-call to `shard_manager.ingest`.
1947 /// The shutdown spin loop reads `in_flight_ingests` to decide
1948 /// whether work is still pending; the lossy-shutdown
1949 /// reconciliation tests need to drive that counter without
1950 /// the matching real `try_enter_ingest` / `ingest_complete`
1951 /// machinery, which would also bump `events_ingested` and
1952 /// confuse the (stranded, post_deadline_ingests) accounting.
1953 ///
1954 /// Tests own the lifecycle: every `stage_stranded_ingest(n)`
1955 /// MUST be paired with `release_stranded_ingest(n)` before
1956 /// the bus drops, otherwise the `Drop` impl's invariants
1957 /// fire.
1958 pub(crate) fn stage_stranded_ingest(&self, n: u64) {
1959 self.in_flight_ingests
1960 .fetch_add(n, std::sync::atomic::Ordering::SeqCst);
1961 }
1962
1963 /// Test-only: counterpart to [`stage_stranded_ingest`].
1964 /// Decrements `in_flight_ingests` by `n` without bumping
1965 /// `events_ingested`. Use when simulating producers that
1966 /// were stranded by the shutdown deadline and never
1967 /// completed (the drain reconciliation should classify them
1968 /// as drops).
1969 pub(crate) fn release_stranded_ingest(&self, n: u64) {
1970 self.in_flight_ingests
1971 .fetch_sub(n, std::sync::atomic::Ordering::SeqCst);
1972 }
1973
1974 /// Test-only: simulate a previously-stranded producer
1975 /// completing AFTER the shutdown deadline. Bumps
1976 /// `events_ingested` first (Release ordering — the
1977 /// post-drain reconciliation reads it with Acquire), then
1978 /// releases the in-flight slot. The order matches what a
1979 /// real producer's `ingest` call site does, so the
1980 /// reconciliation's (ingested_after - ingested_at_deadline)
1981 /// arithmetic reads the same way.
1982 pub(crate) fn complete_stranded_ingest(&self, n: u64) {
1983 self.stats
1984 .events_ingested
1985 .fetch_add(n, std::sync::atomic::Ordering::Release);
1986 self.in_flight_ingests
1987 .fetch_sub(n, std::sync::atomic::Ordering::SeqCst);
1988 }
1989}
1990
1991async fn run_scaling_monitor_via_weak(weak: std::sync::Weak<EventBus>) {
1992 // Refresh `interval` from the policy on every tick. The previous
1993 // version cached it once at task start, so any future runtime
1994 // policy update would not be adopted by the monitor without a
1995 // process restart. Today `EventBusConfig` is immutable
1996 // post-construction so this is a no-op — but reading it each tick
1997 // is cheap (one atomic / RwLock read) and removes the latent
1998 // footgun.
1999 loop {
2000 let interval = match weak.upgrade() {
2001 Some(bus) => match &bus.config.scaling {
2002 Some(p) => p.metrics_window,
2003 None => return,
2004 },
2005 None => return,
2006 };
2007 tokio::time::sleep(interval).await;
2008
2009 let bus = match weak.upgrade() {
2010 Some(b) => b,
2011 // Last strong ref dropped — caller is shutting down (or
2012 // already gone). Exit cleanly.
2013 None => break,
2014 };
2015
2016 // SeqCst to match the writer side (`EventBus::shutdown` /
2017 // `Drop`). The Acquire/Release handshake on
2018 // `drain_finalize_ready` already provides the load-bearing
2019 // happens-before today — but a future code change that
2020 // piggybacks on `shutdown`'s ordering (e.g. a producer that
2021 // observes shutdown without going through
2022 // `try_enter_ingest`) would silently break under Relaxed.
2023 // Aligning the read-side ordering with the writer-side
2024 // SeqCst is a one-instruction tax for the safety.
2025 if bus.shutdown.load(AtomicOrdering::SeqCst) {
2026 break;
2027 }
2028
2029 // Collect metrics for observability.
2030 if let Some(metrics) = bus.shard_manager.collect_metrics() {
2031 for m in &metrics {
2032 if m.fill_ratio > 0.5 {
2033 tracing::debug!(
2034 shard_id = m.shard_id,
2035 fill_ratio = m.fill_ratio,
2036 event_rate = m.event_rate,
2037 "Shard metrics"
2038 );
2039 }
2040 }
2041 }
2042
2043 // Evaluate scaling.
2044 match bus.shard_manager.evaluate_scaling() {
2045 ScalingDecision::ScaleUp(count) => {
2046 tracing::info!(count = count, "Scaling up shards");
2047 for _ in 0..count {
2048 if let Err(e) = bus.add_shard_internal().await {
2049 tracing::error!(error = %e, "Failed to add shard");
2050 break;
2051 }
2052 }
2053 }
2054 ScalingDecision::ScaleDown(count) => {
2055 tracing::info!(count = count, "Scaling down shards");
2056 if let Some(mapper) = bus.shard_manager.mapper() {
2057 if let Ok(drained) = mapper.scale_down(count) {
2058 for shard_id in drained {
2059 let _ = bus.shard_manager.drain_shard(shard_id);
2060 }
2061 }
2062 }
2063 }
2064 ScalingDecision::None => {}
2065 }
2066
2067 if let Some(mapper) = bus.shard_manager.mapper() {
2068 let stopped = mapper.finalize_draining();
2069 for shard_id in stopped {
2070 let _ = bus.remove_shard_internal(shard_id).await;
2071 }
2072 }
2073
2074 // CRITICAL: drop the strong ref BEFORE the next sleep so a
2075 // concurrent `shutdown(self)` caller can `Arc::try_unwrap`
2076 // the last strong ref while we're sleeping.
2077 drop(bus);
2078 }
2079}
2080
2081/// Spawn a batch worker for a shard.
2082/// Dispatch a batch to the adapter with timeout and optional retries.
2083/// Returns true if the batch was accepted, false if all attempts failed.
2084///
2085/// Non-retryable errors (e.g. `AdapterError::Connection`,
2086/// `AdapterError::Fatal`, `AdapterError::Serialization`) skip the
2087/// retry loop and drop the batch immediately. Retrying a fatal error
2088/// just delays the inevitable while burning CPU and amplifying log
2089/// noise. Use `AdapterError::is_retryable` as the single source of
2090/// truth for this decision.
2091/// Compute the per-attempt backoff for `dispatch_batch` retries.
2092///
2093/// Pre-fix the retry loop slept a flat `Duration::from_millis(100)`
2094/// after every failure. Under a partial backend outage (Redis /
2095/// JetStream slow but not dead), every shard's BatchWorker retried
2096/// on the exact same 100 ms cadence, producing a synchronized
2097/// retry storm that amplified load while the backend was
2098/// recovering.
2099///
2100/// Post-fix: exponential backoff (100, 200, 400, 800, 1600, 3200 ms)
2101/// with per-(shard, attempt) jitter to decorrelate retries across
2102/// shards. Capped at attempt=5 (3.2 s base) so retries don't grow
2103/// unboundedly. Jitter is `[-25%, +25%]` of the base, derived from
2104/// hashing `(shard_id, attempt)` — deterministic per shard but
2105/// uncorrelated across shards, which is exactly what the storm
2106/// mitigation needs.
2107fn retry_backoff(shard_id: u16, attempt: u32) -> Duration {
2108 use std::collections::hash_map::DefaultHasher;
2109 use std::hash::{Hash, Hasher};
2110
2111 // 100 ms × 2^attempt, capped at attempt=5 → 100/200/400/800/1600/3200.
2112 let base_ms: u64 = 100u64.saturating_mul(1u64 << attempt.min(5));
2113
2114 let mut hasher = DefaultHasher::new();
2115 shard_id.hash(&mut hasher);
2116 attempt.hash(&mut hasher);
2117 let h = hasher.finish();
2118 // Jitter in [0, base_ms/2), centered to give [-25%, +25%].
2119 let jitter_range = (base_ms / 2).max(1);
2120 let jitter = (h % jitter_range) as i64 - (jitter_range as i64 / 2);
2121 let final_ms = (base_ms as i64 + jitter).max(1) as u64;
2122
2123 Duration::from_millis(final_ms)
2124}
2125
2126async fn dispatch_batch(
2127 adapter: &dyn Adapter,
2128 batch: Arc<Batch>,
2129 shard_id: u16,
2130 timeout: Duration,
2131 retries: u32,
2132) -> bool {
2133 // Retry attempts clone the `Arc` (refcount bump only); the final
2134 // attempt moves it. Pre-fix the batch was `Batch` and every retry
2135 // deep-cloned the events `Vec` — at the common `retries == 0`
2136 // path this was a wasted N×Bytes refcount bump + Vec alloc on
2137 // every batch.
2138 for attempt in 0..retries {
2139 match tokio::time::timeout(timeout, adapter.on_batch(Arc::clone(&batch))).await {
2140 Ok(Ok(())) => return true,
2141 Ok(Err(e)) => {
2142 if !e.is_retryable() {
2143 // Tag with a `reason` field so this
2144 // distinct drop cause is separately filterable
2145 // from retry-exhausted and timeout in
2146 // observability tools. Shutdown is distinguished
2147 // from generic non-retryable so an operator
2148 // chasing "why are batches being dropped" can
2149 // immediately tell a sequencing bug (sending to
2150 // a stopped adapter) from a transport / config
2151 // failure.
2152 let reason = if e.is_shutdown() {
2153 "adapter_shutdown"
2154 } else {
2155 "non_retryable"
2156 };
2157 tracing::error!(
2158 shard_id,
2159 error = %e,
2160 attempt,
2161 reason,
2162 "Non-retryable error from adapter, dropping batch"
2163 );
2164 return false;
2165 }
2166 tracing::warn!(shard_id, error = %e, attempt, "Batch dispatch failed, retrying");
2167 }
2168 Err(_) => {
2169 tracing::warn!(shard_id, attempt, "Adapter on_batch timed out, retrying");
2170 }
2171 }
2172 tokio::time::sleep(retry_backoff(shard_id, attempt)).await;
2173 }
2174
2175 // Pre-fix the final attempt collapsed every drop into
2176 // one log line ("Failed to dispatch batch, dropping"), making
2177 // it impossible to tell retry-exhausted from fatal-non-
2178 // retryable from timeout in metrics. The non-retryable case
2179 // already has its own log inside the retry loop above (early
2180 // return); here we tag retry-exhausted vs timeout-after-
2181 // retries with distinct `reason` fields so log-based
2182 // observability tools can break the drops out by cause.
2183 match tokio::time::timeout(timeout, adapter.on_batch(batch)).await {
2184 Ok(Ok(())) => true,
2185 Ok(Err(e)) => {
2186 tracing::error!(
2187 shard_id,
2188 error = %e,
2189 reason = "retry_exhausted",
2190 attempts = retries + 1,
2191 "Failed to dispatch batch after exhausting retries, dropping"
2192 );
2193 false
2194 }
2195 Err(_) => {
2196 tracing::error!(
2197 shard_id,
2198 reason = "timeout",
2199 attempts = retries + 1,
2200 "Adapter on_batch timed out on final attempt, dropping batch"
2201 );
2202 false
2203 }
2204 }
2205}
2206
2207struct BatchWorkerParams {
2208 shard_id: u16,
2209 rx: mpsc::Receiver<Vec<crate::event::InternalEvent>>,
2210 adapter: Arc<dyn Adapter>,
2211 shard_manager: Arc<ShardManager>,
2212 config: BatchConfig,
2213 adapter_timeout: Duration,
2214 batch_retries: u32,
2215 /// Bus-owned mirror of `BatchWorker::next_sequence`. The worker
2216 /// stores its post-flush sequence here on every dispatch so the
2217 /// bus can read it after the worker exits — see
2218 /// `ShardWorkers::next_sequence` for the consumer side.
2219 next_sequence: Arc<AtomicU64>,
2220 /// Bus-level stats. The worker increments
2221 /// `batches_dispatched` and `events_dispatched` after every
2222 /// successful `dispatch_batch`. Both must actually be
2223 /// incremented here, otherwise `flush()`'s Phase 2 progress
2224 /// probe would always observe zero progress and early-break
2225 /// after a single `max_delay` window — racing the
2226 /// BatchWorker's first `recv_timeout` and flaking on
2227 /// Windows-class timer resolution.
2228 stats: Arc<EventBusStats>,
2229 /// Producer nonce stamped on every batch the worker emits.
2230 /// Bus-loaded from the persistent path when
2231 /// `producer_nonce_path` is configured, otherwise from the
2232 /// per-process default.
2233 producer_nonce: u64,
2234}
2235
2236fn spawn_batch_worker(params: BatchWorkerParams) -> JoinHandle<()> {
2237 let BatchWorkerParams {
2238 shard_id,
2239 mut rx,
2240 adapter,
2241 shard_manager,
2242 config,
2243 adapter_timeout,
2244 batch_retries,
2245 next_sequence,
2246 stats,
2247 producer_nonce,
2248 } = params;
2249 tokio::spawn(async move {
2250 let mut worker = BatchWorker::new(shard_id, config.clone(), next_sequence, producer_nonce);
2251
2252 loop {
2253 // Wait for events with timeout. The batch worker exits
2254 // only when its channel is closed — i.e. after every
2255 // upstream sender (the drain worker for this shard +
2256 // `EventBus::batch_senders`) has been dropped.
2257 // `EventBus::shutdown` enforces that ordering so no
2258 // event is left stranded in the channel.
2259 let recv_timeout = worker.time_until_timeout().unwrap_or(config.max_delay);
2260
2261 match tokio::time::timeout(recv_timeout, rx.recv()).await {
2262 Ok(Some(events)) => {
2263 if let Some(batch) = worker.add_events(events) {
2264 let batch_len = batch.len() as u64;
2265 if dispatch_batch(
2266 &*adapter,
2267 Arc::new(batch),
2268 shard_id,
2269 adapter_timeout,
2270 batch_retries,
2271 )
2272 .await
2273 {
2274 stats
2275 .batches_dispatched
2276 .fetch_add(1, AtomicOrdering::Relaxed);
2277 stats
2278 .events_dispatched
2279 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2280 if let Some(shard_ref) = shard_manager.shard(shard_id) {
2281 shard_ref.lock().record_batch_dispatch();
2282 }
2283 }
2284 }
2285 }
2286 Ok(None) => {
2287 // Channel closed — drain any pending and exit.
2288 if worker.has_pending() {
2289 let batch = worker.flush();
2290 if !batch.is_empty() {
2291 let batch_len = batch.len() as u64;
2292 if dispatch_batch(
2293 &*adapter,
2294 Arc::new(batch),
2295 shard_id,
2296 adapter_timeout,
2297 batch_retries,
2298 )
2299 .await
2300 {
2301 stats
2302 .batches_dispatched
2303 .fetch_add(1, AtomicOrdering::Relaxed);
2304 stats
2305 .events_dispatched
2306 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2307 }
2308 }
2309 }
2310 break;
2311 }
2312 Err(_) => {
2313 // Timeout - check if we need to flush.
2314 // Pre-fix [perf #38] called `worker.add_events(vec![])`
2315 // as the signal, allocating an empty `Vec` per
2316 // timeout tick. `check_timeout` is the direct
2317 // expression of intent — no alloc.
2318 if let Some(batch) = worker.check_timeout() {
2319 let batch_len = batch.len() as u64;
2320 if dispatch_batch(
2321 &*adapter,
2322 Arc::new(batch),
2323 shard_id,
2324 adapter_timeout,
2325 batch_retries,
2326 )
2327 .await
2328 {
2329 stats
2330 .batches_dispatched
2331 .fetch_add(1, AtomicOrdering::Relaxed);
2332 stats
2333 .events_dispatched
2334 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2335 if let Some(shard_ref) = shard_manager.shard(shard_id) {
2336 shard_ref.lock().record_batch_dispatch();
2337 }
2338 }
2339 }
2340 }
2341 }
2342 }
2343 })
2344}
2345
2346/// Maximum time a drain worker waits for `drain_finalize_ready`
2347/// after observing `shutdown=true`. Defense in depth: if a caller
2348/// drops the bus mid-shutdown without setting the gate, we don't
2349/// want the worker pinned forever. The shutdown path *always* sets
2350/// the gate (even on its own timeout), so this deadline is normally
2351/// unreached.
2352const DRAIN_FINALIZE_TIMEOUT: Duration = Duration::from_secs(10);
2353
2354/// Spawn a drain worker for a single shard.
2355///
2356/// Uses a scratch `Vec` + `pop_batch_into` so the per-cycle
2357/// allocation happens *outside* the shard mutex critical section.
2358/// Each cycle: lock → drain into scratch (no alloc, capacity already
2359/// reserved) → unlock → `mem::replace` swaps the filled scratch out
2360/// for a fresh empty `Vec` (alloc *outside* the lock) → send the
2361/// filled batch on the channel.
2362fn spawn_drain_worker_for_shard(
2363 shard_id: u16,
2364 shard_manager: Arc<ShardManager>,
2365 sender: mpsc::Sender<Vec<crate::event::InternalEvent>>,
2366 shutdown: Arc<AtomicBool>,
2367 drain_finalize_ready: Arc<AtomicBool>,
2368) -> JoinHandle<()> {
2369 const STEADY_BATCH: usize = 1_000;
2370 const FINAL_BATCH: usize = 10_000;
2371
2372 tokio::spawn(async move {
2373 let mut scratch: Vec<crate::event::InternalEvent> = Vec::with_capacity(STEADY_BATCH);
2374
2375 loop {
2376 // SeqCst to match the writer side (`EventBus::shutdown` /
2377 // `Drop`). `try_enter_ingest` itself uses SeqCst, and
2378 // the Acquire/Release handshake on
2379 // `drain_finalize_ready` (below) is what actually makes
2380 // the producer-push happen-before visible. Aligning to
2381 // SeqCst here makes the contract robust to future
2382 // producer-side changes that might piggyback on
2383 // `shutdown`'s ordering.
2384 if shutdown.load(AtomicOrdering::SeqCst) {
2385 // Before doing the final sweep, wait for `shutdown()`
2386 // to release the finalize gate. The gate is set only
2387 // after the in-flight ingest counter reaches zero,
2388 // which means every producer that read `shutdown=false`
2389 // has completed its push. Without this wait, the drain
2390 // worker can race ahead of a late push under
2391 // shard-mutex serialization (drain takes the lock
2392 // first, sees nothing, exits; producer then takes the
2393 // lock and pushes — event stranded).
2394 //
2395 // Acquire pairs with the Release in `EventBus::shutdown`
2396 // and `EventBus::drop`, transitively making every
2397 // producer push that happened-before its `in_flight`
2398 // decrement visible to the subsequent `pop_batch_into`.
2399 // `tokio::time::Instant` so virtualized-clock tests
2400 // (`tokio::time::pause`) advance the deadline via
2401 // `sleep` rather than spinning until wall-clock catches
2402 // up.
2403 let finalize_deadline = tokio::time::Instant::now() + DRAIN_FINALIZE_TIMEOUT;
2404 while !drain_finalize_ready.load(AtomicOrdering::Acquire) {
2405 if tokio::time::Instant::now() >= finalize_deadline {
2406 tracing::warn!(
2407 shard_id,
2408 "drain worker timed out waiting for finalize gate; \
2409 proceeding with potential event loss"
2410 );
2411 break;
2412 }
2413 // Park instead of `yield_now` so we don't
2414 // starve the workers / producers we're waiting
2415 // on under contention.
2416 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
2417 }
2418
2419 // Final drain: loop until the ring buffer is empty.
2420 // A single 10k batch is not enough — the ring
2421 // buffer can hold up to `ring_buffer_capacity`
2422 // events (default 1M) and any leftover would be
2423 // silently lost on shutdown.
2424 //
2425 // Pre-fix this broke at the first
2426 // `popped == 0`. The audit posited a narrow race
2427 // where a producer that fetch_add'd
2428 // in_flight_ingests but stalled before the
2429 // shard-lock body could push AFTER shutdown
2430 // observed in_flight=0 yet BEFORE this final
2431 // sweep saw the event. The SeqCst guard pattern
2432 // makes this unlikely (the push happens-before
2433 // the guard drop), but the defense is cheap:
2434 // require TWO consecutive zero-event passes
2435 // before declaring drain. The yield_now between
2436 // them gives a stalled producer a chance to land
2437 // the push.
2438 let mut final_scratch: Vec<crate::event::InternalEvent> =
2439 Vec::with_capacity(FINAL_BATCH);
2440 let mut consecutive_zeros = 0u32;
2441 loop {
2442 let popped = shard_manager
2443 .with_shard(shard_id, |shard| {
2444 shard.pop_batch_into(&mut final_scratch, FINAL_BATCH)
2445 })
2446 .unwrap_or(0);
2447 if popped == 0 {
2448 consecutive_zeros += 1;
2449 if consecutive_zeros >= 2 {
2450 break;
2451 }
2452 // Yield to let any racing producer commit
2453 // its push, then re-poll.
2454 tokio::task::yield_now().await;
2455 continue;
2456 }
2457 consecutive_zeros = 0;
2458 let batch =
2459 std::mem::replace(&mut final_scratch, Vec::with_capacity(FINAL_BATCH));
2460 let batch_len = batch.len();
2461 if let Err(_send_err) = sender.send(batch).await {
2462 // Batch worker exited before drain. The
2463 // `mem::replace` already pulled events out
2464 // of the ring buffer, so the dropped batch
2465 // is unrecoverable — the SendError carries
2466 // it back but the consumer is gone. Surface
2467 // the count loudly so the loss is
2468 // observable in operator dashboards rather
2469 // than a silent miss in shutdown stats.
2470 tracing::error!(
2471 shard_id,
2472 dropped = batch_len,
2473 "drain worker (final): batch worker dropped \
2474 channel before final drain completed; \
2475 events removed from ring buffer cannot be redelivered",
2476 );
2477 break;
2478 }
2479 }
2480 break;
2481 }
2482
2483 // Drain events from ring buffer.
2484 let popped = shard_manager.with_shard(shard_id, |shard| {
2485 shard.pop_batch_into(&mut scratch, STEADY_BATCH)
2486 });
2487
2488 match popped {
2489 Some(0) => {
2490 // No events — yield briefly. The 100μs sleep is deliberate:
2491 // this is a latency-first system where the drain loop is the
2492 // hot path. Longer backoff would add milliseconds of latency
2493 // to the first event after a quiet period, violating the
2494 // sub-microsecond design target. The CPU cost of 100μs polling
2495 // is acceptable for a system that processes 10M+ events/sec.
2496 tokio::time::sleep(Duration::from_micros(100)).await;
2497 }
2498 Some(_) => {
2499 let batch = std::mem::replace(&mut scratch, Vec::with_capacity(STEADY_BATCH));
2500 let batch_len = batch.len();
2501 if let Err(_send_err) = sender.send(batch).await {
2502 // Steady-state: the only way the batch
2503 // worker drops the channel is if it
2504 // panicked or `remove_shard_internal`
2505 // tore it down out of order with the
2506 // drain worker (which the documented
2507 // shutdown sequence forbids). Either way,
2508 // the events are unrecoverable — the
2509 // `mem::replace` above already pulled them
2510 // out of the ring buffer. Pre-fix this
2511 // simply `break`-d, leaving the loss
2512 // invisible. Surface a loud error with
2513 // the dropped count so an out-of-order
2514 // shutdown or batch-worker panic shows up
2515 // in dashboards rather than as a silent
2516 // metric gap.
2517 tracing::error!(
2518 shard_id,
2519 dropped = batch_len,
2520 "drain worker: batch worker dropped channel \
2521 during steady-state drain; events removed from \
2522 ring buffer cannot be redelivered",
2523 );
2524 break;
2525 }
2526 }
2527 None => {
2528 // Shard no longer exists (was removed)
2529 break;
2530 }
2531 }
2532 }
2533 })
2534}
2535
2536#[cfg(test)]
2537mod tests {
2538 use super::*;
2539 use crate::shard::ScalingPolicy;
2540 use serde_json::json;
2541
2542 #[tokio::test]
2543 async fn test_event_bus_basic() {
2544 let config = EventBusConfig::builder()
2545 .num_shards(2)
2546 .ring_buffer_capacity(1024)
2547 .build()
2548 .unwrap();
2549
2550 let bus = EventBus::new(config).await.unwrap();
2551
2552 // Ingest some events
2553 for i in 0..10 {
2554 let event = Event::new(json!({"index": i}));
2555 bus.ingest(event).unwrap();
2556 }
2557
2558 // Give workers time to process
2559 tokio::time::sleep(Duration::from_millis(100)).await;
2560
2561 // Check stats
2562 assert_eq!(
2563 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2564 10
2565 );
2566
2567 bus.shutdown().await.unwrap();
2568 }
2569
2570 #[tokio::test]
2571 async fn test_event_bus_batch_ingest() {
2572 let config = EventBusConfig::default();
2573 let bus = EventBus::new(config).await.unwrap();
2574
2575 let events: Vec<Event> = (0..100).map(|i| Event::new(json!({"i": i}))).collect();
2576
2577 let ingested = bus.ingest_batch(events);
2578 assert_eq!(ingested, 100);
2579
2580 bus.shutdown().await.unwrap();
2581 }
2582
2583 #[tokio::test]
2584 async fn test_event_bus_with_dynamic_scaling() {
2585 let policy = ScalingPolicy {
2586 min_shards: 2,
2587 max_shards: 8,
2588 ..Default::default()
2589 };
2590
2591 let config = EventBusConfig::builder()
2592 .num_shards(2)
2593 .ring_buffer_capacity(1024)
2594 .scaling(policy)
2595 .build()
2596 .unwrap();
2597
2598 let bus = EventBus::new(config).await.unwrap();
2599
2600 // Verify dynamic scaling is enabled
2601 assert!(bus.is_dynamic_scaling_enabled());
2602 assert_eq!(bus.num_shards(), 2);
2603
2604 // Ingest some events
2605 for i in 0..100 {
2606 let event = Event::new(json!({"index": i}));
2607 bus.ingest(event).unwrap();
2608 }
2609
2610 // Give workers time to process
2611 tokio::time::sleep(Duration::from_millis(100)).await;
2612
2613 // Check stats
2614 assert_eq!(
2615 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2616 100
2617 );
2618
2619 bus.shutdown().await.unwrap();
2620 }
2621
2622 #[tokio::test]
2623 async fn test_manual_scale_up() {
2624 let policy = ScalingPolicy {
2625 min_shards: 2,
2626 max_shards: 8,
2627 cooldown: Duration::from_nanos(1), // Effectively disable cooldown for test
2628 ..Default::default()
2629 };
2630
2631 let config = EventBusConfig::builder()
2632 .num_shards(2)
2633 .ring_buffer_capacity(1024)
2634 .scaling(policy)
2635 .build()
2636 .unwrap();
2637
2638 let bus = EventBus::new(config).await.unwrap();
2639
2640 assert_eq!(bus.num_shards(), 2);
2641
2642 // Manually scale up
2643 let new_ids = bus.manual_scale_up(2).await.unwrap();
2644 assert_eq!(new_ids.len(), 2);
2645 assert_eq!(bus.num_shards(), 4);
2646
2647 // Ingest events - they should be distributed across all shards
2648 for i in 0..100 {
2649 let event = Event::new(json!({"index": i}));
2650 bus.ingest(event).unwrap();
2651 }
2652
2653 tokio::time::sleep(Duration::from_millis(100)).await;
2654
2655 assert_eq!(
2656 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2657 100
2658 );
2659
2660 bus.shutdown().await.unwrap();
2661 }
2662
2663 /// Regression for BUG_AUDIT_2026_04_30_CORE.md #82: previously
2664 /// `manual_scale_down` only called `mapper.scale_down(count)`,
2665 /// which marks shards as `Draining` but does NOT finalize them.
2666 /// Bus configs without an active scaling monitor (or callers
2667 /// shutting down before the monitor's next tick) lost any
2668 /// events queued in the drained shards' ring buffers because
2669 /// `remove_shard_internal` was never invoked. The fix runs the
2670 /// full lifecycle synchronously: scale_down → poll for empty →
2671 /// finalize_draining → remove_shard_internal.
2672 ///
2673 /// We pin this by scaling up, manually scaling down, and
2674 /// asserting that `num_shards` actually decreases — pre-fix
2675 /// the count would still reflect the Draining shards.
2676 #[tokio::test]
2677 async fn manual_scale_down_finalizes_and_removes_drained_shards() {
2678 let policy = ScalingPolicy {
2679 min_shards: 2,
2680 max_shards: 8,
2681 cooldown: Duration::from_nanos(1),
2682 ..Default::default()
2683 };
2684 let config = EventBusConfig::builder()
2685 .num_shards(2)
2686 .ring_buffer_capacity(1024)
2687 .scaling(policy)
2688 .build()
2689 .unwrap();
2690 let bus = EventBus::new(config).await.unwrap();
2691
2692 // Scale up to 4, then back down to 2.
2693 let added = bus.manual_scale_up(2).await.unwrap();
2694 assert_eq!(added.len(), 2);
2695 assert_eq!(bus.num_shards(), 4);
2696
2697 let removed = bus.manual_scale_down(2).await.unwrap();
2698 assert_eq!(
2699 removed.len(),
2700 2,
2701 "manual_scale_down must complete the lifecycle for both \
2702 requested shards (mark Draining → wait → finalize → remove)"
2703 );
2704
2705 // Pre-fix: `num_shards` would still be 4 because shards
2706 // were only marked Draining (and the routing-table removal
2707 // path never ran). Post-fix: it's back to 2.
2708 assert_eq!(
2709 bus.num_shards(),
2710 2,
2711 "drained shards must be removed from the routing table"
2712 );
2713
2714 bus.shutdown().await.unwrap();
2715 }
2716
2717 #[tokio::test]
2718 async fn test_shard_metrics() {
2719 let policy = ScalingPolicy::default();
2720
2721 let config = EventBusConfig::builder()
2722 .num_shards(2)
2723 .ring_buffer_capacity(1024)
2724 .scaling(policy)
2725 .build()
2726 .unwrap();
2727
2728 let bus = EventBus::new(config).await.unwrap();
2729
2730 // Ingest some events
2731 for i in 0..50 {
2732 let event = Event::new(json!({"index": i}));
2733 bus.ingest(event).unwrap();
2734 }
2735
2736 // Get metrics
2737 let metrics = bus.shard_metrics();
2738 assert!(metrics.is_some());
2739 let metrics = metrics.unwrap();
2740 assert_eq!(metrics.len(), 2);
2741
2742 bus.shutdown().await.unwrap();
2743 }
2744
2745 #[tokio::test]
2746 async fn test_regression_eventbus_drop_signals_shutdown() {
2747 // Regression: dropping an EventBus without calling shutdown() used to
2748 // leave background tasks running indefinitely. The Drop impl now sets
2749 // the shutdown flag so workers eventually exit.
2750 let result = tokio::time::timeout(Duration::from_secs(5), async {
2751 let config = EventBusConfig::builder()
2752 .num_shards(2)
2753 .ring_buffer_capacity(1024)
2754 .build()
2755 .unwrap();
2756
2757 let bus = EventBus::new(config).await.unwrap();
2758
2759 // Ingest some events
2760 for i in 0..10 {
2761 let event = Event::new(json!({"index": i}));
2762 bus.ingest(event).unwrap();
2763 }
2764
2765 // Drop without calling shutdown()
2766 drop(bus);
2767
2768 // If we reach here, the drop didn't hang
2769 })
2770 .await;
2771
2772 assert!(
2773 result.is_ok(),
2774 "EventBus drop should not hang — Drop impl must signal shutdown"
2775 );
2776 }
2777
2778 #[tokio::test]
2779 async fn test_with_dynamic_scaling_builder() {
2780 let config = EventBusConfig::builder()
2781 .num_shards(4)
2782 .ring_buffer_capacity(2048)
2783 .with_dynamic_scaling()
2784 .build()
2785 .unwrap();
2786
2787 let bus = EventBus::new(config).await.unwrap();
2788
2789 assert!(bus.is_dynamic_scaling_enabled());
2790 assert_eq!(bus.num_shards(), 4);
2791
2792 bus.shutdown().await.unwrap();
2793 }
2794
2795 /// Mock adapter that counts `on_batch` invocations and returns a
2796 /// configurable error variant. Used to assert dispatch retry
2797 /// semantics without dragging in a real adapter.
2798 struct CountingErrAdapter {
2799 calls: Arc<std::sync::atomic::AtomicU32>,
2800 make_err: Box<dyn Fn() -> AdapterError + Send + Sync>,
2801 }
2802
2803 #[async_trait::async_trait]
2804 impl crate::adapter::Adapter for CountingErrAdapter {
2805 async fn init(&mut self) -> Result<(), AdapterError> {
2806 Ok(())
2807 }
2808 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
2809 self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2810 Err((self.make_err)())
2811 }
2812 async fn flush(&self) -> Result<(), AdapterError> {
2813 Ok(())
2814 }
2815 async fn shutdown(&self) -> Result<(), AdapterError> {
2816 Ok(())
2817 }
2818 async fn poll_shard(
2819 &self,
2820 _shard_id: u16,
2821 _from_id: Option<&str>,
2822 _limit: usize,
2823 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2824 Ok(crate::adapter::ShardPollResult::empty())
2825 }
2826 fn name(&self) -> &'static str {
2827 "counting_err"
2828 }
2829 async fn is_healthy(&self) -> bool {
2830 true
2831 }
2832 }
2833
2834 /// Regression: BUG_REPORT.md #21 — `dispatch_batch` previously
2835 /// retried every error variant, ignoring `AdapterError::is_retryable`.
2836 /// A non-retryable error (Connection / Fatal / Serialization)
2837 /// should now drop the batch immediately rather than burn the
2838 /// retry budget on something that cannot succeed.
2839 #[tokio::test(start_paused = true)]
2840 async fn dispatch_batch_skips_retries_on_non_retryable_error() {
2841 let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2842 let adapter = CountingErrAdapter {
2843 calls: calls.clone(),
2844 make_err: Box::new(|| AdapterError::Connection("refused".into())),
2845 };
2846
2847 let batch = Batch::new(0, vec![], 0);
2848 let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 5).await;
2849
2850 assert!(!ok, "non-retryable error must drop batch");
2851 assert_eq!(
2852 calls.load(std::sync::atomic::Ordering::SeqCst),
2853 1,
2854 "Connection error must not be retried; expected exactly 1 on_batch call"
2855 );
2856 }
2857
2858 /// Sanity: a retryable error *does* go through the full retry
2859 /// budget. Without this companion check, the previous test could
2860 /// pass for the wrong reason (e.g. if dispatch always returned on
2861 /// the first error).
2862 #[tokio::test(start_paused = true)]
2863 async fn dispatch_batch_retries_transient_errors() {
2864 let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2865 let adapter = CountingErrAdapter {
2866 calls: calls.clone(),
2867 make_err: Box::new(|| AdapterError::Transient("temp".into())),
2868 };
2869
2870 let batch = Batch::new(0, vec![], 0);
2871 let ok = dispatch_batch(&adapter, Arc::new(batch), 0, Duration::from_secs(1), 3).await;
2872
2873 assert!(!ok);
2874 // 3 retries + 1 final attempt = 4 total calls.
2875 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 4);
2876 }
2877
2878 /// Pin the perf contract behind the `Arc<Batch>` change.
2879 ///
2880 /// Pre-fix `dispatch_batch` deep-cloned the events `Vec` on every
2881 /// retry attempt (and the comment in production code already
2882 /// admitted this was wasted on the common `retries == 0` path).
2883 /// Post-fix the dispatcher takes `Arc<Batch>` and each retry is a
2884 /// refcount bump.
2885 ///
2886 /// We pin the contract observationally: an adapter that fails 3
2887 /// times then succeeds receives the *same* `Arc<Batch>` pointer
2888 /// on every call. A regression that ever rebuilt the Batch
2889 /// (e.g. a future refactor that does `Arc::new(batch.as_ref().clone())`)
2890 /// would surface here as distinct `Arc::as_ptr` values.
2891 #[tokio::test(start_paused = true)]
2892 async fn dispatch_batch_retries_share_the_same_arc_allocation() {
2893 // Identity-record `Arc::as_ptr(&batch)` per call via the
2894 // batch's strong-count snapshot. We cast to `usize` so the
2895 // adapter can stay `Send + Sync` without an unsafe impl —
2896 // the value is just an opaque identity stamp.
2897 struct PointerRecordingAdapter {
2898 seen: Arc<parking_lot::Mutex<Vec<usize>>>,
2899 fail_first_n: u32,
2900 calls: Arc<std::sync::atomic::AtomicU32>,
2901 }
2902
2903 #[async_trait::async_trait]
2904 impl crate::adapter::Adapter for PointerRecordingAdapter {
2905 async fn init(&mut self) -> Result<(), AdapterError> {
2906 Ok(())
2907 }
2908 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
2909 self.seen.lock().push(Arc::as_ptr(&batch) as usize);
2910 let n = self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2911 if n < self.fail_first_n {
2912 Err(AdapterError::Transient("retry me".into()))
2913 } else {
2914 Ok(())
2915 }
2916 }
2917 async fn flush(&self) -> Result<(), AdapterError> {
2918 Ok(())
2919 }
2920 async fn shutdown(&self) -> Result<(), AdapterError> {
2921 Ok(())
2922 }
2923 async fn poll_shard(
2924 &self,
2925 _shard_id: u16,
2926 _from_id: Option<&str>,
2927 _limit: usize,
2928 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2929 Ok(crate::adapter::ShardPollResult::empty())
2930 }
2931 fn name(&self) -> &'static str {
2932 "pointer-recording"
2933 }
2934 }
2935
2936 let seen = Arc::new(parking_lot::Mutex::new(Vec::new()));
2937 let adapter = PointerRecordingAdapter {
2938 seen: Arc::clone(&seen),
2939 fail_first_n: 3,
2940 calls: Arc::new(std::sync::atomic::AtomicU32::new(0)),
2941 };
2942 let batch = Arc::new(Batch::new(0, vec![], 0));
2943 let original_ptr = Arc::as_ptr(&batch) as usize;
2944
2945 let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 5).await;
2946 assert!(ok, "adapter accepts on 4th try; dispatch must succeed");
2947
2948 let pointers = seen.lock().clone();
2949 assert_eq!(
2950 pointers.len(),
2951 4,
2952 "expected 3 failures + 1 success = 4 on_batch calls",
2953 );
2954 // Every call observed the SAME Arc target. A regression that
2955 // deep-clones would have produced four distinct pointers.
2956 for (i, p) in pointers.iter().enumerate() {
2957 assert_eq!(
2958 *p, original_ptr,
2959 "attempt {i} saw a different Arc<Batch>; dispatch_batch deep-cloned",
2960 );
2961 }
2962 }
2963
2964 /// Counting adapter that records the number of events delivered via
2965 /// `on_batch`. Used by shutdown-durability tests below.
2966 struct CountingAdapter {
2967 received: Arc<AtomicU64>,
2968 }
2969
2970 #[async_trait::async_trait]
2971 impl crate::adapter::Adapter for CountingAdapter {
2972 async fn init(&mut self) -> Result<(), AdapterError> {
2973 Ok(())
2974 }
2975 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
2976 self.received
2977 .fetch_add(batch.events.len() as u64, AtomicOrdering::SeqCst);
2978 Ok(())
2979 }
2980 async fn flush(&self) -> Result<(), AdapterError> {
2981 Ok(())
2982 }
2983 async fn shutdown(&self) -> Result<(), AdapterError> {
2984 Ok(())
2985 }
2986 async fn poll_shard(
2987 &self,
2988 _shard_id: u16,
2989 _from_id: Option<&str>,
2990 _limit: usize,
2991 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2992 Ok(crate::adapter::ShardPollResult::empty())
2993 }
2994 fn name(&self) -> &'static str {
2995 "counting"
2996 }
2997 async fn is_healthy(&self) -> bool {
2998 true
2999 }
3000 }
3001
3002 /// `retry_backoff` exponentially grows the base delay
3003 /// per attempt and adds per-(shard, attempt) jitter to
3004 /// decorrelate retries across shards. Pin both invariants:
3005 /// monotonic growth on the base, and jitter that produces
3006 /// different outputs for different shard ids.
3007 #[test]
3008 fn retry_backoff_grows_with_attempt_and_jitters_per_shard() {
3009 // Shard 0 attempt 0..6: base ms 100, 200, 400, 800, 1600,
3010 // 3200, 3200 (cap). Plus ±25% jitter.
3011 let s0_a0 = retry_backoff(0, 0).as_millis();
3012 let s0_a1 = retry_backoff(0, 1).as_millis();
3013 let s0_a4 = retry_backoff(0, 4).as_millis();
3014 let s0_a5 = retry_backoff(0, 5).as_millis();
3015 let s0_a6 = retry_backoff(0, 6).as_millis();
3016
3017 // Bounds: each attempt's base is in `[base*0.75, base*1.25)`.
3018 assert!((75..=125).contains(&s0_a0));
3019 assert!((150..=250).contains(&s0_a1));
3020 assert!((1200..=2000).contains(&s0_a4));
3021 assert!((2400..=4000).contains(&s0_a5));
3022 // Cap at attempt=5: attempt 6 must NOT exceed attempt 5's
3023 // upper bound.
3024 assert!(
3025 s0_a6 <= 4000,
3026 "attempt > 5 must cap at the attempt-5 base; got {}ms",
3027 s0_a6
3028 );
3029
3030 // Jitter property: different shards at the same
3031 // attempt land on different backoffs. Sample 16 distinct
3032 // shard ids and assert at least 4 unique backoff values.
3033 //
3034 // The bound is deliberately loose (4 / 16) because
3035 // `DefaultHasher`'s exact distribution is **not stable**
3036 // across Rust toolchain versions — a tighter check (e.g.
3037 // ≥ 8) would empirically pass on every toolchain we test
3038 // against today, but a future stdlib change to the hasher
3039 // could shift the distribution and flake CI for a property
3040 // (decorrelation across shards) that doesn't actually
3041 // depend on a high collision-resistance bar. Asserting
3042 // ≥ 4 unique values out of 16 is enough to catch a real
3043 // regression (e.g. accidentally hashing only `attempt`
3044 // and not `shard_id` would collapse all 16 to a single
3045 // value) while staying robust to hasher-distribution
3046 // drift.
3047 use std::collections::HashSet;
3048 let s_attempt2: HashSet<u128> = (0u16..16)
3049 .map(|s| retry_backoff(s, 2).as_millis())
3050 .collect();
3051 assert!(
3052 s_attempt2.len() >= 4,
3053 "jitter must decorrelate retries across shards; \
3054 only {} unique backoffs across 16 shards",
3055 s_attempt2.len()
3056 );
3057 }
3058
3059 /// CR-23: pin that `EventBus::shutdown` actually invokes the
3060 /// adapter's `flush()` and `shutdown()` methods. The existing
3061 /// `sdk/tests/shutdown_regression.rs` covers the
3062 /// "shutdown runs even with outstanding Arc clones" property
3063 /// using a memory adapter whose `flush`/`shutdown` are no-ops
3064 /// — so a regression that elided the adapter calls would still
3065 /// pass. This test uses a recording adapter that increments
3066 /// per-method counters; we assert flush AND shutdown both fired
3067 /// exactly once after a clean `bus.shutdown().await`.
3068 ///
3069 /// The fix routes `Net::shutdown` through
3070 /// `shutdown_via_ref`, which in turn calls
3071 /// `self.adapter.flush()` and `self.adapter.shutdown()` once
3072 /// each. CR-23 pins this contract at the bus layer so an
3073 /// inadvertent regression at the SDK or adapter wrapper layer
3074 /// can be caught without an integration setup.
3075 #[tokio::test]
3076 async fn cr23_shutdown_invokes_adapter_flush_and_shutdown_exactly_once() {
3077 struct RecordingAdapter {
3078 on_batch_calls: Arc<AtomicU64>,
3079 flush_calls: Arc<AtomicU64>,
3080 shutdown_calls: Arc<AtomicU64>,
3081 }
3082
3083 #[async_trait::async_trait]
3084 impl crate::adapter::Adapter for RecordingAdapter {
3085 async fn init(&mut self) -> Result<(), AdapterError> {
3086 Ok(())
3087 }
3088 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3089 self.on_batch_calls.fetch_add(1, AtomicOrdering::SeqCst);
3090 Ok(())
3091 }
3092 async fn flush(&self) -> Result<(), AdapterError> {
3093 self.flush_calls.fetch_add(1, AtomicOrdering::SeqCst);
3094 Ok(())
3095 }
3096 async fn shutdown(&self) -> Result<(), AdapterError> {
3097 self.shutdown_calls.fetch_add(1, AtomicOrdering::SeqCst);
3098 Ok(())
3099 }
3100 async fn poll_shard(
3101 &self,
3102 _shard_id: u16,
3103 _from_id: Option<&str>,
3104 _limit: usize,
3105 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3106 Ok(crate::adapter::ShardPollResult::empty())
3107 }
3108 fn name(&self) -> &'static str {
3109 "cr23-recording"
3110 }
3111 async fn is_healthy(&self) -> bool {
3112 true
3113 }
3114 }
3115
3116 let on_batch = Arc::new(AtomicU64::new(0));
3117 let flush = Arc::new(AtomicU64::new(0));
3118 let shutdown = Arc::new(AtomicU64::new(0));
3119 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(RecordingAdapter {
3120 on_batch_calls: on_batch.clone(),
3121 flush_calls: flush.clone(),
3122 shutdown_calls: shutdown.clone(),
3123 });
3124
3125 let config = EventBusConfig::builder()
3126 .num_shards(2)
3127 .ring_buffer_capacity(1024)
3128 .build()
3129 .unwrap();
3130 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3131
3132 // Drive a small burst so on_batch fires at least once —
3133 // pins that the adapter is wired up correctly. The
3134 // load-bearing assertions below are on flush and shutdown.
3135 for i in 0..16 {
3136 let _ = bus.ingest(Event::new(json!({"i": i})));
3137 }
3138
3139 // Pre-CR-23 a regression that elided one of these would
3140 // pass `shutdown_regression.rs::shutdown_runs_even_with_outstanding_event_stream`
3141 // because the memory adapter's flush/shutdown are no-ops.
3142 // Here the recording adapter makes the contract observable.
3143 bus.shutdown().await.unwrap();
3144
3145 assert!(
3146 on_batch.load(AtomicOrdering::SeqCst) > 0,
3147 "sanity: on_batch must have fired at least once"
3148 );
3149 assert_eq!(
3150 flush.load(AtomicOrdering::SeqCst),
3151 1,
3152 "CR-23 regression: shutdown MUST call adapter.flush() exactly once"
3153 );
3154 assert_eq!(
3155 shutdown.load(AtomicOrdering::SeqCst),
3156 1,
3157 "CR-23 regression: shutdown MUST call adapter.shutdown() exactly once"
3158 );
3159 }
3160
3161 /// CR-25: pin that a SECOND caller of `shutdown_via_ref` whose
3162 /// CAS loses (because a first caller already flipped the
3163 /// `shutdown` flag) and whose deadline elapses BEFORE the
3164 /// first caller sets `shutdown_completed=true` returns
3165 /// `AdapterError::Transient(_)` — NOT a silent `Ok(())`.
3166 ///
3167 /// Pre-CR-25 both branches returned `Ok`. A caller that lost
3168 /// the CAS race had no way to distinguish "first caller
3169 /// finished shutdown" from "deadline timed out mid-shutdown."
3170 /// Under a slow adapter (`adapter_timeout` default 30s >
3171 /// the 10s spin deadline), the second caller silently saw
3172 /// `Ok` while the bus was still mid-shutdown — letting
3173 /// subsequent code observe a partially-shut-down bus.
3174 ///
3175 /// We use a slow first caller (sleeps inside `flush()`)
3176 /// and override the spin deadline to a few ms so the test
3177 /// runs fast.
3178 #[tokio::test]
3179 async fn cr25_second_caller_returns_transient_when_deadline_elapses() {
3180 struct SlowFlushAdapter {
3181 // Block flush() for this long. The first caller
3182 // gets stuck here while the second caller's spin
3183 // deadline elapses.
3184 flush_delay: std::time::Duration,
3185 }
3186
3187 #[async_trait::async_trait]
3188 impl crate::adapter::Adapter for SlowFlushAdapter {
3189 async fn init(&mut self) -> Result<(), AdapterError> {
3190 Ok(())
3191 }
3192 async fn on_batch(&self, _batch: Arc<Batch>) -> Result<(), AdapterError> {
3193 Ok(())
3194 }
3195 async fn flush(&self) -> Result<(), AdapterError> {
3196 tokio::time::sleep(self.flush_delay).await;
3197 Ok(())
3198 }
3199 async fn shutdown(&self) -> Result<(), AdapterError> {
3200 Ok(())
3201 }
3202 async fn poll_shard(
3203 &self,
3204 _shard_id: u16,
3205 _from_id: Option<&str>,
3206 _limit: usize,
3207 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3208 Ok(crate::adapter::ShardPollResult::empty())
3209 }
3210 fn name(&self) -> &'static str {
3211 "cr25-slow-flush"
3212 }
3213 async fn is_healthy(&self) -> bool {
3214 true
3215 }
3216 }
3217
3218 // Cubic P2: serialize access to the global deadline
3219 // override so concurrent tests don't interfere. Hold the
3220 // guard until the override is reset to 0 below.
3221 let _override_guard = super::shutdown_deadline_override_lock().await;
3222
3223 // Override the second-caller spin deadline to a short
3224 // value so the test doesn't wall-clock-wait 10s. Production
3225 // builds use the 10s default (the cfg(test) override is
3226 // compiled out).
3227 super::set_shutdown_via_ref_spin_deadline_for_test(50);
3228
3229 let config = EventBusConfig::builder()
3230 .num_shards(2)
3231 .ring_buffer_capacity(1024)
3232 .adapter_timeout(std::time::Duration::from_secs(5))
3233 .build()
3234 .unwrap();
3235 // First caller's flush() sleeps 500ms — far longer than
3236 // the 50ms spin deadline.
3237 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(SlowFlushAdapter {
3238 flush_delay: std::time::Duration::from_millis(500),
3239 });
3240 let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
3241
3242 // Spawn the FIRST caller — it wins the CAS and proceeds
3243 // into the slow flush. We don't await it; we want it
3244 // running in parallel.
3245 let bus_first = Arc::clone(&bus);
3246 let first_caller = tokio::spawn(async move { bus_first.shutdown_via_ref().await });
3247
3248 // Cubic P2: poll `is_shutdown()` until the first caller
3249 // has set the flag, instead of a fixed sleep. This makes
3250 // the test scheduler-independent — we proceed as soon as
3251 // the first caller has won the CAS, regardless of how
3252 // tokio happens to schedule things. Bounded by a 1s
3253 // timeout so a regression that prevents `shutdown_via_ref`
3254 // from setting the flag doesn't hang the test.
3255 tokio::time::timeout(std::time::Duration::from_secs(1), async {
3256 while !bus.is_shutdown() {
3257 tokio::task::yield_now().await;
3258 }
3259 })
3260 .await
3261 .expect("first caller did not set shutdown flag within 1s");
3262
3263 // The second caller's CAS will fail; it enters the spin
3264 // and times out at 50ms.
3265 let start = std::time::Instant::now();
3266 let second_result = bus.shutdown_via_ref().await;
3267 let elapsed = start.elapsed();
3268
3269 // Reset the override for other tests, then drop the guard.
3270 super::set_shutdown_via_ref_spin_deadline_for_test(0);
3271
3272 // CR-25 contract: the second caller MUST get a Transient
3273 // error, not a silent Ok.
3274 let err = second_result.expect_err(
3275 "CR-25 regression: second caller MUST surface AdapterError::Transient \
3276 when its deadline elapses, NOT a silent Ok",
3277 );
3278 match err {
3279 AdapterError::Transient(msg) => {
3280 assert!(
3281 msg.contains("deadline elapsed") || msg.contains("mid-shutdown"),
3282 "error message must reference the deadline path; got: {msg}"
3283 );
3284 }
3285 other => panic!("expected Transient, got {:?}", other),
3286 }
3287
3288 // Sanity: the second caller's elapsed time is bounded by
3289 // the override (50ms) + scheduler slop. If this is
3290 // anywhere near the production 10s, the cfg(test)
3291 // override path broke.
3292 assert!(
3293 elapsed < std::time::Duration::from_secs(5),
3294 "second caller took {elapsed:?}; the cfg(test) deadline override \
3295 broke if this is near the 10s production default"
3296 );
3297
3298 // Wait for the first caller to finish. Even though it
3299 // took the slow path, the bus IS shutting down and will
3300 // eventually complete.
3301 let _ = first_caller.await.unwrap();
3302 assert!(bus.is_shutdown_completed());
3303 }
3304
3305 /// Regression: BUG_REPORT.md #6 — `shutdown()` must deliver every
3306 /// successfully-ingested event to the adapter before returning.
3307 /// Pins the broader durability contract that the
3308 /// `drain_finalize_ready` gate supports: the drain worker may not
3309 /// finalize until the in-flight wait completes.
3310 ///
3311 /// Tests across many shards with bursts large enough that the
3312 /// drain workers are mid-loop when shutdown begins.
3313 #[tokio::test]
3314 async fn shutdown_delivers_every_successful_ingest_to_adapter() {
3315 let received = Arc::new(AtomicU64::new(0));
3316 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3317 received: received.clone(),
3318 });
3319
3320 let config = EventBusConfig::builder()
3321 .num_shards(4)
3322 .ring_buffer_capacity(4096)
3323 .build()
3324 .unwrap();
3325 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3326
3327 // Drive a sizable burst across all shards. Capacity > burst so
3328 // we don't trip backpressure; every successful Ok must reach
3329 // `on_batch` before shutdown returns.
3330 let total = 10_000usize;
3331 let mut successes = 0u64;
3332 for i in 0..total {
3333 if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3334 successes += 1;
3335 }
3336 }
3337
3338 // Shutdown awaits drain workers; with the BUG_REPORT.md #6 fix
3339 // those workers wait on `drain_finalize_ready` after observing
3340 // `shutdown=true`, so any push the producer made before the
3341 // shutdown flag is guaranteed to be in the ring buffer when
3342 // the final sweep runs.
3343 bus.shutdown().await.unwrap();
3344
3345 let delivered = received.load(AtomicOrdering::SeqCst);
3346 assert_eq!(
3347 delivered, successes,
3348 "shutdown stranded events: {successes} ingested successfully, \
3349 only {delivered} reached the adapter"
3350 );
3351 }
3352
3353 /// Regression: BUG_REPORT.md #16 — `flush()` must be a delivery
3354 /// barrier: after it returns successfully, every event the
3355 /// caller successfully ingested before `flush()` was called
3356 /// must have been handed to the adapter via `on_batch`.
3357 /// The previous implementation slept a single `batch.max_delay`
3358 /// after the ring buffers drained, which left a window where
3359 /// events could still be sitting in the per-shard mpsc channel
3360 /// or inside a partially-filled batch awaiting timeout — those
3361 /// events were silently dropped from the flush guarantee.
3362 #[tokio::test]
3363 async fn flush_is_a_delivery_barrier() {
3364 let received = Arc::new(AtomicU64::new(0));
3365 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3366 received: received.clone(),
3367 });
3368
3369 // Use a deliberately *long* batch.max_delay (250ms) so that a
3370 // partially-filled batch sitting in the batch worker's
3371 // pending state would survive past the old single-`max_delay`
3372 // sleep. min_size > burst forces the partial-batch path.
3373 let config = EventBusConfig::builder()
3374 .num_shards(2)
3375 .ring_buffer_capacity(1024)
3376 .batch(crate::config::BatchConfig {
3377 min_size: 1_000,
3378 max_size: 10_000,
3379 max_delay: Duration::from_millis(250),
3380 adaptive: false,
3381 velocity_window: Duration::from_millis(100),
3382 })
3383 .build()
3384 .unwrap();
3385 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3386
3387 // A small burst — far below `min_size`, so the batch worker
3388 // will sit on a partial batch waiting for `max_delay`.
3389 let burst = 50usize;
3390 let mut successes = 0u64;
3391 for i in 0..burst {
3392 if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3393 successes += 1;
3394 }
3395 }
3396
3397 // Time the flush call to confirm we waited long enough for
3398 // the partial batch to time out. The previous code slept
3399 // ~10ms total in the post-empty phase; the fix waits up to
3400 // `max_delay * num_workers` (here 500ms cap, capped at 2s).
3401 let t0 = std::time::Instant::now();
3402 bus.flush().await.unwrap();
3403 let elapsed = t0.elapsed();
3404
3405 // After flush returns, every successful ingest must have
3406 // been delivered to the adapter. With the old code this
3407 // assertion would fail: events sit in the partial batch
3408 // until `max_delay` (250ms) elapses, but flush returned
3409 // after only ~10ms.
3410 let delivered = received.load(AtomicOrdering::SeqCst);
3411 assert_eq!(
3412 delivered, successes,
3413 "flush() returned but only {delivered} of {successes} \
3414 events reached the adapter (#16); flush waited {:?}",
3415 elapsed
3416 );
3417
3418 bus.shutdown().await.unwrap();
3419 }
3420
3421 /// Regression (Phase 1): when configured with a
3422 /// persistent `producer_nonce_path`, two bus instances launched
3423 /// against the same path stamp the SAME nonce on every emitted
3424 /// batch. JetStream / Redis adapters key dedup on this nonce, so
3425 /// a producer that crashed mid-batch and restarted (within the
3426 /// backend's dedup window) issues retries with the same msg-ids
3427 /// and the backend correctly recognizes them as duplicates.
3428 ///
3429 /// Pre-fix the per-process nonce regenerated on every startup,
3430 /// so post-crash retries wrote NEW msg-ids and the backend
3431 /// persisted the partial-batch's accepted half twice.
3432 #[tokio::test]
3433 async fn persistent_producer_nonce_survives_bus_restart() {
3434 // Use a per-test temp file so concurrent runs don't collide.
3435 let mut nonce_path = std::env::temp_dir();
3436 let pid = std::process::id();
3437 let nanos = std::time::SystemTime::now()
3438 .duration_since(std::time::UNIX_EPOCH)
3439 .map(|d| d.as_nanos())
3440 .unwrap_or(0);
3441 nonce_path.push(format!("net-test-bus-nonce-{pid}-{nanos}"));
3442
3443 let make_config = |path: &std::path::Path| {
3444 EventBusConfig::builder()
3445 .num_shards(1)
3446 .ring_buffer_capacity(1024)
3447 .producer_nonce_path(path)
3448 .build()
3449 .unwrap()
3450 };
3451
3452 // First bus: ingest one event. Read its nonce off the
3453 // adapter-bound batch via a recording adapter.
3454 struct NonceRecordingAdapter {
3455 nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3456 }
3457 #[async_trait::async_trait]
3458 impl crate::adapter::Adapter for NonceRecordingAdapter {
3459 async fn init(&mut self) -> Result<(), AdapterError> {
3460 Ok(())
3461 }
3462 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3463 *self.nonce.lock() = Some(batch.process_nonce);
3464 Ok(())
3465 }
3466 async fn flush(&self) -> Result<(), AdapterError> {
3467 Ok(())
3468 }
3469 async fn shutdown(&self) -> Result<(), AdapterError> {
3470 Ok(())
3471 }
3472 async fn poll_shard(
3473 &self,
3474 _: u16,
3475 _: Option<&str>,
3476 _: usize,
3477 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3478 Ok(crate::adapter::ShardPollResult::empty())
3479 }
3480 fn name(&self) -> &'static str {
3481 "nonce-recording"
3482 }
3483 }
3484
3485 let nonce_first_run = Arc::new(parking_lot::Mutex::new(None));
3486 {
3487 let bus = EventBus::new_with_adapter(
3488 make_config(&nonce_path),
3489 Box::new(NonceRecordingAdapter {
3490 nonce: nonce_first_run.clone(),
3491 }),
3492 )
3493 .await
3494 .unwrap();
3495 bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3496 bus.flush().await.unwrap();
3497 bus.shutdown().await.unwrap();
3498 }
3499
3500 let nonce_second_run = Arc::new(parking_lot::Mutex::new(None));
3501 {
3502 let bus = EventBus::new_with_adapter(
3503 make_config(&nonce_path),
3504 Box::new(NonceRecordingAdapter {
3505 nonce: nonce_second_run.clone(),
3506 }),
3507 )
3508 .await
3509 .unwrap();
3510 bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3511 bus.flush().await.unwrap();
3512 bus.shutdown().await.unwrap();
3513 }
3514
3515 let n_a = nonce_first_run
3516 .lock()
3517 .expect("first bus must have dispatched a batch");
3518 let n_b = nonce_second_run
3519 .lock()
3520 .expect("second bus must have dispatched a batch");
3521 assert_eq!(
3522 n_a, n_b,
3523 "two bus instances against the same producer_nonce_path \
3524 must stamp the same nonce — pre-fix this regenerated on \
3525 every restart and JetStream's dedup window saw new \
3526 msg-ids as fresh batches",
3527 );
3528
3529 // Cleanup.
3530 let _ = std::fs::remove_file(&nonce_path);
3531 }
3532
3533 /// Pin that ALL spawn sites — both the static initial-shard
3534 /// loop in `new_with_adapter` and the dynamic-add path in
3535 /// `add_shard_internal` — clone the bus's loaded
3536 /// `producer_nonce` correctly. Pre-#56 there was no nonce
3537 /// concept at the bus layer; if any future refactor drops the
3538 /// `producer_nonce: self.producer_nonce` line from one of the
3539 /// spawn sites (or stops loading the persistent path), the
3540 /// post-scale-up shard's batches would carry a different nonce
3541 /// and JetStream's cross-restart dedup would silently break for
3542 /// events ingested into the dynamic shard. Pin all observed
3543 /// batches across the static + dynamic shards share the bus's
3544 /// nonce.
3545 #[tokio::test]
3546 async fn multi_shard_bus_stamps_consistent_nonce_across_static_and_dynamic_shards() {
3547 let mut nonce_path = std::env::temp_dir();
3548 let pid = std::process::id();
3549 let nanos = std::time::SystemTime::now()
3550 .duration_since(std::time::UNIX_EPOCH)
3551 .map(|d| d.as_nanos())
3552 .unwrap_or(0);
3553 nonce_path.push(format!("net-test-multi-shard-nonce-{pid}-{nanos}"));
3554
3555 struct CollectingAdapter {
3556 nonces: Arc<parking_lot::Mutex<Vec<u64>>>,
3557 }
3558 #[async_trait::async_trait]
3559 impl crate::adapter::Adapter for CollectingAdapter {
3560 async fn init(&mut self) -> Result<(), AdapterError> {
3561 Ok(())
3562 }
3563 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3564 self.nonces.lock().push(batch.process_nonce);
3565 Ok(())
3566 }
3567 async fn flush(&self) -> Result<(), AdapterError> {
3568 Ok(())
3569 }
3570 async fn shutdown(&self) -> Result<(), AdapterError> {
3571 Ok(())
3572 }
3573 async fn poll_shard(
3574 &self,
3575 _: u16,
3576 _: Option<&str>,
3577 _: usize,
3578 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3579 Ok(crate::adapter::ShardPollResult::empty())
3580 }
3581 fn name(&self) -> &'static str {
3582 "collecting"
3583 }
3584 }
3585
3586 let nonces = Arc::new(parking_lot::Mutex::new(Vec::new()));
3587 let policy = ScalingPolicy {
3588 min_shards: 1,
3589 max_shards: 8,
3590 cooldown: Duration::from_nanos(1),
3591 ..Default::default()
3592 };
3593 let config = EventBusConfig::builder()
3594 .num_shards(2)
3595 .ring_buffer_capacity(1024)
3596 .scaling(policy)
3597 .producer_nonce_path(&nonce_path)
3598 .build()
3599 .unwrap();
3600
3601 let bus = EventBus::new_with_adapter(
3602 config,
3603 Box::new(CollectingAdapter {
3604 nonces: nonces.clone(),
3605 }),
3606 )
3607 .await
3608 .unwrap();
3609
3610 // Drive the two static shards.
3611 for i in 0..200u64 {
3612 let _ = bus.ingest(Event::new(json!({"i": i})));
3613 }
3614 bus.flush().await.unwrap();
3615
3616 // Add a dynamic shard and drive it too.
3617 let _ = bus.manual_scale_up(1).await.unwrap();
3618 for i in 200..400u64 {
3619 let _ = bus.ingest(Event::new(json!({"i": i})));
3620 }
3621 bus.flush().await.unwrap();
3622
3623 bus.shutdown().await.unwrap();
3624
3625 let observed = nonces.lock().clone();
3626 assert!(
3627 !observed.is_empty(),
3628 "expected the adapter to have observed at least one batch",
3629 );
3630 let first = observed[0];
3631 for (i, &n) in observed.iter().enumerate() {
3632 assert_eq!(
3633 n, first,
3634 "batch {i} stamped a different nonce ({n:#x}) than the first \
3635 batch ({first:#x}) — at least one spawn site (initial-shard \
3636 loop or `add_shard_internal`) failed to inherit the bus's \
3637 producer_nonce",
3638 );
3639 }
3640
3641 let _ = std::fs::remove_file(&nonce_path);
3642 }
3643
3644 /// Regression guard for the `(process_nonce, shard_id,
3645 /// sequence_start, i)` JetStream / Redis msg-id construction.
3646 /// Within one bus instance, `sequence_start` per shard MUST be
3647 /// strictly monotonic across batches AND every two adjacent
3648 /// batches `n` and `n+1` from the same shard MUST satisfy
3649 /// `seq_start[n+1] == seq_start[n] + len(events[n])` — no gaps,
3650 /// no overlap. A regression here breaks the at-most-once dedup
3651 /// every persistent adapter relies on: gaps reuse a `(shard,
3652 /// seq, i)` tuple after the dedup window closes, and overlap
3653 /// silently overlays a later batch on an earlier one's slot.
3654 ///
3655 /// The cross-restart variant (persistent `next_sequence` across
3656 /// process boots) is feature-shaped, not a bug fix — without
3657 /// persistence, restart relies on `process_nonce` rotating to
3658 /// disjoin the msg-id namespace (pinned by
3659 /// `persistent_producer_nonce_survives_bus_restart`). This test
3660 /// pins the within-process invariant the persistent nonce
3661 /// builds on.
3662 #[tokio::test]
3663 async fn sequence_start_is_per_shard_monotonic_and_gap_free() {
3664 struct ShardSeqRecorder {
3665 batches: Arc<parking_lot::Mutex<Vec<(u16, u64, usize)>>>,
3666 }
3667 #[async_trait::async_trait]
3668 impl crate::adapter::Adapter for ShardSeqRecorder {
3669 async fn init(&mut self) -> Result<(), AdapterError> {
3670 Ok(())
3671 }
3672 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3673 self.batches.lock().push((
3674 batch.shard_id,
3675 batch.sequence_start,
3676 batch.events.len(),
3677 ));
3678 Ok(())
3679 }
3680 async fn flush(&self) -> Result<(), AdapterError> {
3681 Ok(())
3682 }
3683 async fn shutdown(&self) -> Result<(), AdapterError> {
3684 Ok(())
3685 }
3686 async fn poll_shard(
3687 &self,
3688 _: u16,
3689 _: Option<&str>,
3690 _: usize,
3691 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3692 Ok(crate::adapter::ShardPollResult::empty())
3693 }
3694 fn name(&self) -> &'static str {
3695 "shard-seq-recorder"
3696 }
3697 }
3698
3699 let batches = Arc::new(parking_lot::Mutex::new(Vec::new()));
3700 let config = EventBusConfig::builder()
3701 .num_shards(4)
3702 .ring_buffer_capacity(1024)
3703 .build()
3704 .unwrap();
3705 let bus = EventBus::new_with_adapter(
3706 config,
3707 Box::new(ShardSeqRecorder {
3708 batches: batches.clone(),
3709 }),
3710 )
3711 .await
3712 .unwrap();
3713
3714 // Three drive-then-flush rounds so each shard sees multiple
3715 // batches; the across-batch monotonicity invariant is what
3716 // we're pinning, so a single batch per shard wouldn't
3717 // exercise it.
3718 for round in 0..3u64 {
3719 for i in 0..200u64 {
3720 bus.ingest(Event::new(json!({"r": round, "i": i}))).unwrap();
3721 }
3722 bus.flush().await.unwrap();
3723 }
3724 bus.shutdown().await.unwrap();
3725
3726 let observed = batches.lock().clone();
3727 assert!(
3728 !observed.is_empty(),
3729 "expected the adapter to have observed at least one batch",
3730 );
3731
3732 // Bucket batches per shard, preserving dispatch order.
3733 let mut by_shard: std::collections::HashMap<u16, Vec<(u64, usize)>> =
3734 std::collections::HashMap::new();
3735 for (shard, seq_start, len) in observed {
3736 by_shard.entry(shard).or_default().push((seq_start, len));
3737 }
3738
3739 for (shard, runs) in &by_shard {
3740 assert!(
3741 !runs.is_empty(),
3742 "shard {shard}: must have at least one batch",
3743 );
3744 // First batch of every shard starts at the per-shard
3745 // zero — `BatchWorker::next_sequence` is initialized to
3746 // 0 and the first `flush()` reads it before the
3747 // `saturating_add`. A regression that lazily seeds
3748 // `next_sequence` from a non-zero source (e.g. wall
3749 // clock) would trip here.
3750 assert_eq!(
3751 runs[0].0, 0,
3752 "shard {shard}: first batch must start at sequence 0, got {}",
3753 runs[0].0,
3754 );
3755 for window in runs.windows(2) {
3756 let (prev_start, prev_len) = window[0];
3757 let (next_start, _next_len) = window[1];
3758 let expected_next = prev_start
3759 .checked_add(prev_len as u64)
3760 .expect("test bounds keep us well below u64::MAX");
3761 assert!(
3762 next_start > prev_start,
3763 "shard {shard}: sequence_start must be strictly monotonic; \
3764 saw {prev_start} → {next_start}",
3765 );
3766 assert_eq!(
3767 next_start, expected_next,
3768 "shard {shard}: gap or overlap in sequence_start; \
3769 prev=({prev_start}, len={prev_len}) → next={next_start}, \
3770 expected {expected_next}. A gap reuses (shard, seq, i) \
3771 tuples after the JetStream/Redis dedup window closes; an \
3772 overlap silently overlays a later batch on an earlier \
3773 one's slot.",
3774 );
3775 }
3776 }
3777 }
3778
3779 /// Pin the within-process caching contract for the fallback
3780 /// (no-`producer_nonce_path`) path: two bus instances created
3781 /// in the same process see the SAME `batch_process_nonce()`
3782 /// because the helper is `OnceLock`-cached. The
3783 /// "different-across-restarts" semantic is a *process-level*
3784 /// guarantee — restart the process to get a fresh nonce — and
3785 /// is pinned by `persistent_producer_nonce_survives_bus_restart`
3786 /// (which uses a path; the without-path branch of #56 has no
3787 /// cross-restart guarantee by design).
3788 ///
3789 /// Cubic-ai P3: this test was previously named
3790 /// `process_nonce_fallback_differs_across_bus_instances`, which
3791 /// contradicted its own assertion (`assert_eq!(n_a, n_b)`).
3792 /// Renamed to match what it actually pins.
3793 #[tokio::test]
3794 async fn process_nonce_fallback_is_cached_within_process() {
3795 struct NonceRecordingAdapter {
3796 nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3797 }
3798 #[async_trait::async_trait]
3799 impl crate::adapter::Adapter for NonceRecordingAdapter {
3800 async fn init(&mut self) -> Result<(), AdapterError> {
3801 Ok(())
3802 }
3803 async fn on_batch(&self, batch: Arc<Batch>) -> Result<(), AdapterError> {
3804 *self.nonce.lock() = Some(batch.process_nonce);
3805 Ok(())
3806 }
3807 async fn flush(&self) -> Result<(), AdapterError> {
3808 Ok(())
3809 }
3810 async fn shutdown(&self) -> Result<(), AdapterError> {
3811 Ok(())
3812 }
3813 async fn poll_shard(
3814 &self,
3815 _: u16,
3816 _: Option<&str>,
3817 _: usize,
3818 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3819 Ok(crate::adapter::ShardPollResult::empty())
3820 }
3821 fn name(&self) -> &'static str {
3822 "nonce-recording"
3823 }
3824 }
3825
3826 let cfg = || {
3827 EventBusConfig::builder()
3828 .num_shards(1)
3829 .ring_buffer_capacity(1024)
3830 .build()
3831 .unwrap()
3832 };
3833
3834 let n_a = Arc::new(parking_lot::Mutex::new(None));
3835 let n_b = Arc::new(parking_lot::Mutex::new(None));
3836 {
3837 let bus = EventBus::new_with_adapter(
3838 cfg(),
3839 Box::new(NonceRecordingAdapter { nonce: n_a.clone() }),
3840 )
3841 .await
3842 .unwrap();
3843 bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3844 bus.flush().await.unwrap();
3845 bus.shutdown().await.unwrap();
3846 }
3847 {
3848 let bus = EventBus::new_with_adapter(
3849 cfg(),
3850 Box::new(NonceRecordingAdapter { nonce: n_b.clone() }),
3851 )
3852 .await
3853 .unwrap();
3854 bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3855 bus.flush().await.unwrap();
3856 bus.shutdown().await.unwrap();
3857 }
3858
3859 // Note: in a single-process test BOTH bus instances see the
3860 // same `OnceLock`-cached `batch_process_nonce`, so the
3861 // nonces ARE equal here even though the documented
3862 // semantic is "fresh per process." This test pins the
3863 // cached-within-a-process invariant; the across-PROCESSES
3864 // semantic is exercised by the persistent-nonce test
3865 // above (which is the actually-load-bearing path for the
3866 // persistent-nonce fix).
3867 let n_a = n_a.lock().unwrap();
3868 let n_b = n_b.lock().unwrap();
3869 assert_eq!(
3870 n_a, n_b,
3871 "within one process, batch_process_nonce is OnceLock-cached \
3872 so two bus instances see the same nonce — the \
3873 different-across-restarts contract is process-level, \
3874 pinned via `persistent_producer_nonce_survives_bus_restart`",
3875 );
3876 }
3877
3878 /// Regression: `EventBusStats::batches_dispatched`
3879 /// (and the new `events_dispatched`) must actually increment on
3880 /// every successful adapter dispatch. Pre-fix `batches_dispatched`
3881 /// was declared but never updated, so flush()'s Phase 2 progress
3882 /// gate was constant-zero and early-broke after one window —
3883 /// flake on Windows-class timer resolution. Pin both counters
3884 /// directly here so a future refactor that drops the increment
3885 /// fails this test, not the timing-dependent
3886 /// `flush_is_a_delivery_barrier`.
3887 #[tokio::test]
3888 async fn dispatch_increments_bus_level_event_and_batch_counters() {
3889 let received = Arc::new(AtomicU64::new(0));
3890 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3891 received: received.clone(),
3892 });
3893
3894 let config = EventBusConfig::builder()
3895 .num_shards(2)
3896 .ring_buffer_capacity(1024)
3897 .batch(crate::config::BatchConfig {
3898 min_size: 1,
3899 max_size: 10,
3900 max_delay: Duration::from_millis(10),
3901 adaptive: false,
3902 velocity_window: Duration::from_millis(100),
3903 })
3904 .build()
3905 .unwrap();
3906 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3907
3908 for i in 0..50 {
3909 bus.ingest(Event::new(json!({"i": i}))).unwrap();
3910 }
3911 bus.flush().await.unwrap();
3912
3913 let batches = bus.stats().batches_dispatched.load(AtomicOrdering::Acquire);
3914 let events = bus.stats().events_dispatched.load(AtomicOrdering::Acquire);
3915 assert!(
3916 batches > 0,
3917 "batches_dispatched must be > 0 after flush — pre-fix it was \
3918 never incremented anywhere, breaking flush()'s Phase 2 progress gate",
3919 );
3920 assert_eq!(
3921 events, 50,
3922 "events_dispatched must equal the number of events handed to \
3923 the adapter (got {events}, expected 50)",
3924 );
3925
3926 bus.shutdown().await.unwrap();
3927 }
3928
3929 /// Regression: BUG_REPORT.md #6 — drop-without-shutdown must
3930 /// still release the drain-finalize gate so detached drain
3931 /// workers can exit instead of parking on the gate until the
3932 /// internal `DRAIN_FINALIZE_TIMEOUT` deadline. Pinning this
3933 /// keeps the `Drop` impl honest if someone refactors the
3934 /// shutdown gates later.
3935 #[tokio::test]
3936 async fn drop_releases_drain_finalize_gate_promptly() {
3937 let config = EventBusConfig::builder()
3938 .num_shards(2)
3939 .ring_buffer_capacity(1024)
3940 .build()
3941 .unwrap();
3942 let bus = EventBus::new(config).await.unwrap();
3943 let drain_gate = bus.drain_finalize_ready.clone();
3944
3945 // Drop without an awaited shutdown.
3946 drop(bus);
3947
3948 // The Drop impl must have set the gate. `DRAIN_FINALIZE_TIMEOUT`
3949 // is 10s; if Drop didn't flip the gate, drain workers would
3950 // park for up to that long before exiting.
3951 assert!(
3952 drain_gate.load(AtomicOrdering::Acquire),
3953 "Drop must release `drain_finalize_ready` so detached drain \
3954 workers exit promptly"
3955 );
3956 }
3957
3958 /// The lossy-shutdown reconciliation at L1615-L1635 has TWO
3959 /// arms after the deadline-snapshot fires:
3960 ///
3961 /// - `actual_drops > 0` → bump `events_dropped`
3962 /// (already exercised by the deadline path running with
3963 /// real producers that never recover)
3964 /// - `actual_drops == 0` → clear the eager
3965 /// `shutdown_was_lossy = true` because the drain caught
3966 /// up to every stranded ingest
3967 ///
3968 /// The second arm is the false-positive-alert fix: pre-fix,
3969 /// the `was_lossy` flag stayed `true` against
3970 /// `events_dropped == 0` for any deadline-triggered shutdown
3971 /// whose final drain happened to catch up — operator
3972 /// dashboards alerting on `was_lossy && dropped == 0` saw
3973 /// false positives.
3974 ///
3975 /// We pin the clear-flag arm by:
3976 /// 1. Manually staging `in_flight_ingests = 1` so the
3977 /// shutdown spin loop sees pending work.
3978 /// 2. Using `tokio::time::advance(6s)` to force the
3979 /// hardcoded 5-second deadline (the comments at L1399
3980 /// and L1725 confirm the deadline uses
3981 /// `tokio::time::Instant` precisely so paused-time
3982 /// tests can virtualize it).
3983 /// 3. Simulating the stranded producer completing AFTER
3984 /// the deadline snapshot: bump `events_ingested += 1`
3985 /// and drop the in-flight counter back to 0. The
3986 /// post-drain reconciliation reads
3987 /// `post_deadline_ingests = 1 - 0 = 1 >= stranded = 1`
3988 /// so `actual_drops == 0` and the arm clears
3989 /// `shutdown_was_lossy`.
3990 #[tokio::test(start_paused = true)]
3991 async fn lossy_shutdown_reconciliation_clears_was_lossy_when_drain_catches_up() {
3992 let config = EventBusConfig::builder()
3993 .num_shards(1)
3994 .ring_buffer_capacity(1024)
3995 .without_scaling()
3996 .build()
3997 .unwrap();
3998 let bus = Arc::new(
3999 EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4000 .await
4001 .unwrap(),
4002 );
4003
4004 // Stage one "stranded" in-flight ingest. Real producers
4005 // would have incremented this via try_enter_ingest and
4006 // be mid-call to `shard_manager.ingest`; the test helper
4007 // (defined alongside `EventBus`) holds the counter at 1
4008 // for the duration of the deadline-window.
4009 bus.stage_stranded_ingest(1);
4010
4011 // Spawn shutdown — it'll enter the in-flight spin loop
4012 // and park on `tokio::time::sleep(1ms)` until time
4013 // advances.
4014 let bus_for_shutdown = Arc::clone(&bus);
4015 let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4016
4017 // Yield enough times for the shutdown task to enter the
4018 // spin loop. With paused time, the spin's 1ms sleep
4019 // blocks until we advance.
4020 for _ in 0..10 {
4021 tokio::task::yield_now().await;
4022 }
4023
4024 // Advance past the 5s deadline. The next loop iteration
4025 // observes the deadline, captures `(stranded=1,
4026 // ingested=0, dispatched=0)`, sets `was_lossy = true`,
4027 // and breaks.
4028 tokio::time::advance(std::time::Duration::from_secs(6)).await;
4029 for _ in 0..10 {
4030 tokio::task::yield_now().await;
4031 }
4032
4033 // The eager-set was_lossy flag is visible now.
4034 assert!(
4035 bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4036 "deadline must have set was_lossy=true (eagerly)",
4037 );
4038
4039 // Now simulate the "stranded" producer completing AFTER
4040 // the deadline. The helper bumps events_ingested with
4041 // Release ordering before releasing the in-flight slot
4042 // — matching what a real `ingest` call site does, so
4043 // the reconciliation reads the same arithmetic.
4044 bus.complete_stranded_ingest(1);
4045
4046 // Let shutdown finish — its post-drain reconciliation
4047 // now reads ingested_after=1, post_deadline_ingests=1,
4048 // actual_drops=stranded(1)-1=0 → clears was_lossy.
4049 let _ = shutdown_task.await.unwrap();
4050
4051 assert!(
4052 !bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4053 "reconciliation must clear was_lossy when drain catches up to every stranded ingest \
4054 (post_deadline_ingests >= stranded → actual_drops == 0)",
4055 );
4056 // And events_dropped must NOT have been bumped — the
4057 // whole point of the reconciliation arm is that no event
4058 // was actually lost.
4059 assert_eq!(
4060 bus.stats.events_dropped.load(AtomicOrdering::Acquire),
4061 0,
4062 "no events were actually dropped — reconciliation must not bump events_dropped",
4063 );
4064 }
4065
4066 /// Companion to `lossy_shutdown_reconciliation_clears_was_lossy_*`:
4067 /// pins the OTHER arm of the same if/else at L1615-L1619.
4068 /// When drain does NOT catch up (post_deadline_ingests <
4069 /// stranded), `actual_drops > 0` and `events_dropped` is
4070 /// bumped. The `was_lossy = true` flag set eagerly before the
4071 /// reconciliation stays true here — pre-fix the same code
4072 /// path bumped events_dropped += stranded EAGERLY *and*
4073 /// counted the same events as ingested, breaking the
4074 /// `ingested == dispatched + dropped` invariant.
4075 #[tokio::test(start_paused = true)]
4076 async fn lossy_shutdown_reconciliation_bumps_events_dropped_when_drain_does_not_catch_up() {
4077 let config = EventBusConfig::builder()
4078 .num_shards(1)
4079 .ring_buffer_capacity(1024)
4080 .without_scaling()
4081 .build()
4082 .unwrap();
4083 let bus = Arc::new(
4084 EventBus::new_with_adapter(config, Box::new(crate::adapter::NoopAdapter::new()))
4085 .await
4086 .unwrap(),
4087 );
4088
4089 // Stage two stranded ingests so the actual_drops count
4090 // is meaningful (not just 0 vs 1).
4091 bus.stage_stranded_ingest(2);
4092
4093 let bus_for_shutdown = Arc::clone(&bus);
4094 let shutdown_task = tokio::spawn(async move { bus_for_shutdown.shutdown_via_ref().await });
4095
4096 for _ in 0..10 {
4097 tokio::task::yield_now().await;
4098 }
4099 tokio::time::advance(std::time::Duration::from_secs(6)).await;
4100 for _ in 0..10 {
4101 tokio::task::yield_now().await;
4102 }
4103
4104 // Drop the in-flight counter to 0 so subsequent shutdown
4105 // bookkeeping doesn't see phantom in-flight work, but do
4106 // NOT bump events_ingested — the producers never
4107 // "completed," so the reconciliation must classify them
4108 // as drops.
4109 bus.release_stranded_ingest(2);
4110
4111 let _ = shutdown_task.await.unwrap();
4112
4113 assert!(
4114 bus.stats.shutdown_was_lossy.load(AtomicOrdering::Acquire),
4115 "was_lossy must stay true when drain did not catch up",
4116 );
4117 // stranded=2, post_deadline_ingests=0, actual_drops=2.
4118 // We don't pin == 2 exactly because the "known under-count
4119 // window" doc'd at L1595-L1614 acknowledges a ±1 bias
4120 // under racy interleavings; the floor is what matters.
4121 assert!(
4122 bus.stats.events_dropped.load(AtomicOrdering::Acquire) >= 1,
4123 "events_dropped must reflect at least one stranded ingest",
4124 );
4125 }
4126
4127 /// `remove_shard_internal` at L832-L859 dispatches any
4128 /// stranded ring-buffer events through the adapter as a
4129 /// one-shot batch. The normal scale-down path drains the
4130 /// ring via the drain worker before remove fires, so
4131 /// `stranded.is_empty()` on the happy path and L832-L859
4132 /// never runs — but a race window between drain completion
4133 /// and remove (or a manual remove on a shard whose drain
4134 /// worker is parked) leaves events stranded. Pre-fix any
4135 /// stranded events would have been silently dropped; the
4136 /// L832-L859 path now flushes them and bumps both
4137 /// `batches_dispatched` (+1) and `events_dispatched` (+N).
4138 ///
4139 /// We force the stranded-events condition by:
4140 /// 1. Pausing time so the drain worker can't pull from
4141 /// the ring (its polling sleeps don't tick).
4142 /// 2. Pushing raw events directly into shard 0's ring via
4143 /// the `with_shard` API.
4144 /// 3. Calling `remove_shard_internal(0)` — its inner
4145 /// `shard_manager.remove_shard` then returns the
4146 /// stranded vec, triggering the L832-L859 dispatch.
4147 #[tokio::test(start_paused = true)]
4148 async fn remove_shard_internal_dispatches_stranded_ring_buffer_events() {
4149 let received = Arc::new(AtomicU64::new(0));
4150 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
4151 received: Arc::clone(&received),
4152 });
4153 // Keep the default scaling policy: `remove_shard_internal`
4154 // routes through `shard_manager.remove_shard`, which
4155 // requires the dynamic-scaling mapper. `without_scaling()`
4156 // turns the mapper off and the remove fails with
4157 // "Dynamic scaling not enabled". The scaling MONITOR
4158 // task is a separate concern and isn't auto-started by
4159 // `new_with_adapter` — see
4160 // `start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it`
4161 // for the lifecycle contract.
4162 let config = EventBusConfig::builder()
4163 .num_shards(2)
4164 .ring_buffer_capacity(1024)
4165 .build()
4166 .unwrap();
4167 let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
4168
4169 // Yield enough times for the spawned drain workers to
4170 // reach their first `tokio::time::sleep` and park. With
4171 // `start_paused` they can't tick once parked; the gate
4172 // is here so the worker on shard 0 has *already parked*
4173 // before we push events into the ring. Without this, a
4174 // worker that hadn't yet hit its first park could race
4175 // the push and dispatch via the normal drain path
4176 // instead of the L832-L859 stranded path the test exists
4177 // to pin.
4178 for _ in 0..10 {
4179 tokio::task::yield_now().await;
4180 }
4181
4182 // Stage 5 events directly in shard 0's ring. With paused
4183 // time the drain worker can't pull them out — they sit
4184 // there until remove fires.
4185 let stranded_count = 5u64;
4186 let pushed = bus.shard_manager.with_shard(0, |shard| {
4187 for i in 0..stranded_count {
4188 shard
4189 .try_push_raw(bytes::Bytes::from(format!("stranded-{i}")))
4190 .unwrap();
4191 }
4192 });
4193 assert!(
4194 pushed.is_some(),
4195 "shard 0 must exist for try_push_raw to land",
4196 );
4197
4198 let batches_before = bus.stats.batches_dispatched.load(AtomicOrdering::Acquire);
4199 let events_before = bus.stats.events_dispatched.load(AtomicOrdering::Acquire);
4200
4201 // Directly invoke the internal — the production path
4202 // routes through manual_scale_down + finalize_draining,
4203 // but the L832-L859 dispatch arm is the same regardless
4204 // of how we got here.
4205 bus.remove_shard_internal(0)
4206 .await
4207 .expect("remove_shard_internal must succeed");
4208
4209 // The CountingAdapter saw the stranded batch.
4210 assert_eq!(
4211 received.load(AtomicOrdering::SeqCst),
4212 stranded_count,
4213 "stranded events must reach the adapter",
4214 );
4215 // Bus stats bumped by exactly the right amounts.
4216 assert_eq!(
4217 bus.stats.batches_dispatched.load(AtomicOrdering::Acquire),
4218 batches_before + 1,
4219 "exactly one stranded batch must increment batches_dispatched",
4220 );
4221 assert_eq!(
4222 bus.stats.events_dispatched.load(AtomicOrdering::Acquire),
4223 events_before + stranded_count,
4224 "events_dispatched must reflect every stranded event",
4225 );
4226
4227 // Clean shutdown so the remaining shard's drain worker
4228 // exits cleanly rather than getting torn down by the
4229 // bus's `Drop` impl (which silently loses any events
4230 // still in the ring buffers / mpsc channels). Mirrors
4231 // the lifecycle hygiene the other paused-time tests in
4232 // this module follow.
4233 //
4234 // The remaining drain worker (shard 1) is parked on
4235 // `tokio::time::sleep`; tokio's paused-time auto-advance
4236 // wakes it once every task is parked on time, but we
4237 // yield-loop here anyway so the JoinHandle await below
4238 // doesn't depend on auto-advance heuristics.
4239 bus.shutdown_via_ref()
4240 .await
4241 .expect("shutdown must succeed under paused time");
4242 }
4243
4244 /// `EventBus::new_with_adapter` must surface a `Fatal` error
4245 /// — naming the configured path — when `producer_nonce_path`
4246 /// points at a location the runtime can't read or create. The
4247 /// path-load failure path at L292-L301 is the single signal
4248 /// operators get that their persistent-nonce wiring is wrong;
4249 /// silently falling back to the per-process nonce would
4250 /// silently degrade the durability guarantee that path was
4251 /// chosen to provide (at-most-once across restart).
4252 #[tokio::test]
4253 async fn new_with_adapter_surfaces_fatal_when_nonce_path_unreadable() {
4254 // Point at a path whose parent directory doesn't exist.
4255 // `load_or_create` has to mkdir-then-write, so a missing
4256 // parent fails on every platform. Anchoring under
4257 // `std::env::temp_dir()` keeps the path well-formed on
4258 // both Windows (`C:\Users\...\Temp\...`) and Linux/macOS
4259 // (`/tmp/...`) — a hardcoded `C:\\…` literal looks like a
4260 // valid filename on POSIX filesystems and the create
4261 // happily succeeds, silently bypassing the failure-arm
4262 // this test exists to pin.
4263 let marker = "net-coverage-nonexistent-parent";
4264 let bogus_path = std::env::temp_dir()
4265 .join(marker)
4266 .join("deeply")
4267 .join("nested")
4268 .join("nonce");
4269 let config = EventBusConfig::builder()
4270 .num_shards(1)
4271 .ring_buffer_capacity(1024)
4272 .producer_nonce_path(&bogus_path)
4273 .build()
4274 .unwrap();
4275 let adapter: Box<dyn crate::adapter::Adapter> =
4276 Box::new(crate::adapter::NoopAdapter::new());
4277
4278 // EventBus doesn't impl Debug, so we can't unwrap_err
4279 // it directly — pattern-match instead.
4280 let result = EventBus::new_with_adapter(config, adapter).await;
4281 match result {
4282 Err(AdapterError::Fatal(msg)) => {
4283 assert!(
4284 msg.contains("producer-nonce") || msg.contains("nonce"),
4285 "error must name the failing subsystem: {msg}",
4286 );
4287 // The path must appear in the message so operators
4288 // can grep their logs to the misconfigured config
4289 // field without guessing. The unique marker
4290 // segment is portable across platforms (the temp
4291 // root differs but the joined marker is the same).
4292 assert!(
4293 msg.contains(marker),
4294 "error must include the configured path: {msg}",
4295 );
4296 }
4297 Err(other) => panic!("expected Fatal, got {other:?}"),
4298 Ok(_) => panic!("expected Fatal, got Ok"),
4299 }
4300 }
4301
4302 /// `start_scaling_monitor` must be a no-op on a non-scaling
4303 /// bus AND idempotent under repeated calls on a scaling bus.
4304 /// A regression that drops the early-returns would spawn
4305 /// shadow monitor tasks that hold a Weak<EventBus> until they
4306 /// next observe shutdown — pure-overhead duplicate
4307 /// metrics-tick wakeup rates that compete for the same
4308 /// `evaluate_scaling` lock.
4309 #[tokio::test]
4310 async fn start_scaling_monitor_is_noop_without_scaling_and_idempotent_with_it() {
4311 // Scaling explicitly disabled: start_scaling_monitor
4312 // must return before touching the slot. (The builder
4313 // default fills in a ScalingPolicy when scaling isn't
4314 // mentioned — `without_scaling()` is the only way to
4315 // force `config.scaling == None`.)
4316 let config = EventBusConfig::builder()
4317 .num_shards(1)
4318 .ring_buffer_capacity(1024)
4319 .without_scaling()
4320 .build()
4321 .unwrap();
4322 let bus = EventBus::new(config).await.unwrap();
4323 let bus = Arc::new(bus);
4324 bus.start_scaling_monitor();
4325 assert!(
4326 bus.scaling_monitor.lock().is_none(),
4327 "non-scaling bus must not install a monitor",
4328 );
4329
4330 // Scaling enabled: first call installs, second is a no-op
4331 // (the existing handle stays put — proves the early-return
4332 // didn't overwrite the slot).
4333 let policy = crate::shard::ScalingPolicy {
4334 min_shards: 2,
4335 max_shards: 4,
4336 ..Default::default()
4337 };
4338 let config = EventBusConfig::builder()
4339 .num_shards(2)
4340 .ring_buffer_capacity(1024)
4341 .scaling(policy)
4342 .build()
4343 .unwrap();
4344 let bus = Arc::new(EventBus::new(config).await.unwrap());
4345 // EventBus::new doesn't auto-start the monitor — we start
4346 // it explicitly so the test owns the lifecycle.
4347 bus.start_scaling_monitor();
4348 let handle_id_after_first = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4349 assert!(handle_id_after_first.is_some(), "first start must install");
4350
4351 // Second call must NOT replace the handle. If the early-
4352 // return is broken, the slot would carry a new handle id.
4353 bus.start_scaling_monitor();
4354 let handle_id_after_second = bus.scaling_monitor.lock().as_ref().map(|h| h.id());
4355 assert_eq!(
4356 handle_id_after_first, handle_id_after_second,
4357 "second start_scaling_monitor must NOT replace the running handle",
4358 );
4359 }
4360}