net/bus.rs
1//! Main EventBus facade.
2//!
3//! The EventBus provides a unified API for:
4//! - Event ingestion (non-blocking)
5//! - Event consumption (async polling with filtering)
6//! - Lifecycle management
7
8use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::mpsc;
13use tokio::task::JoinHandle;
14
15use crate::adapter::{Adapter, NoopAdapter};
16use crate::config::{AdapterConfig, BatchConfig, EventBusConfig};
17use crate::consumer::{ConsumeRequest, ConsumeResponse, PollMerger};
18use crate::error::{AdapterError, ConsumerError, IngestionError, IngestionResult};
19use crate::event::{Batch, Event, RawEvent};
20use crate::shard::{BatchWorker, ScalingDecision, ShardManager, ShardMetrics};
21
22#[cfg(feature = "jetstream")]
23use crate::adapter::JetStreamAdapter;
24#[cfg(feature = "net")]
25use crate::adapter::NetAdapter;
26#[cfg(feature = "redis")]
27use crate::adapter::RedisAdapter;
28
29/// The main event bus.
30///
31/// # Example
32///
33/// ```rust,ignore
34/// use net::{EventBus, EventBusConfig, Event};
35///
36/// #[tokio::main]
37/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
38/// let bus = EventBus::new(EventBusConfig::default()).await?;
39///
40/// // Ingest events
41/// bus.ingest(Event::from_str(r#"{"token": "hello"}"#)?)?;
42///
43/// // Poll events
44/// let response = bus.poll(ConsumeRequest::new(100)).await?;
45///
46/// bus.shutdown().await?;
47/// Ok(())
48/// }
49/// ```
50pub struct EventBus {
51 /// Shard manager for parallel ingestion.
52 shard_manager: Arc<ShardManager>,
53 /// Adapter for durable storage.
54 adapter: Arc<dyn Adapter>,
55 /// Poll merger for cross-shard consumption.
56 poll_merger: arc_swap::ArcSwap<PollMerger>,
57 /// Serializes the `shard_manager.shard_ids() → poll_merger.store`
58 /// block in `add_shard_internal` / `remove_shard_internal`.
59 /// Without this lock, two callers (e.g. scaling-monitor
60 /// add_shard racing manual_scale_down's remove_shard) read the
61 /// shard ids snapshot at slightly different points and then
62 /// race on the `arc_swap.store`. The write that lands second
63 /// can clobber the more-current view: T1 reads `{0..5}`, T2
64 /// reads `{1..4}`, T2 stores `{1..4}`, T1 stores `{0..5}` —
65 /// the published merger then routes polls to the just-removed
66 /// shard 0 until the next topology change repairs the
67 /// snapshot.
68 poll_merger_swap_lock: parking_lot::Mutex<()>,
69 /// Per-shard worker handles. Stored separately so shutdown can
70 /// await drain workers *before* batch workers — the drain
71 /// worker's final sweep races the batch worker's exit
72 /// otherwise, and any events the drain worker pushes to the
73 /// channel after the batch worker has stopped reading are
74 /// silently lost.
75 batch_workers: parking_lot::Mutex<std::collections::HashMap<u16, ShardWorkers>>,
76 /// Channels for sending batches to workers (shard_id -> sender).
77 batch_senders: parking_lot::RwLock<
78 std::collections::HashMap<u16, mpsc::Sender<Vec<crate::event::InternalEvent>>>,
79 >,
80 /// Shutdown flag.
81 shutdown: Arc<AtomicBool>,
82 /// Gate signaling drain workers that the in-flight wait has
83 /// completed and they may safely run their final ring-buffer
84 /// sweep. Distinct from `shutdown` because the drain worker
85 /// observing `shutdown=true` alone is not enough: a producer
86 /// that read `shutdown=false` may still be mid-push, and if the
87 /// drain worker rushes through its final sweep before that push
88 /// is visible the event is stranded. `shutdown()` sets this
89 /// after waiting for `in_flight_ingests==0`, at which point the
90 /// Acquire load on the drain side synchronizes-with the Release
91 /// store here, transitively chaining through the SeqCst
92 /// in-flight handshake to make every observed-pre-shutdown push
93 /// visible to the drain worker's subsequent `pop_batch_into`.
94 drain_finalize_ready: Arc<AtomicBool>,
95 /// In-flight ingest counter. Incremented before each ingest's
96 /// shutdown check and decremented after the push completes (or
97 /// bails). `shutdown()` waits for this to drop to zero *after*
98 /// setting `shutdown=true` and *before* setting
99 /// `drain_finalize_ready=true` so no producer is mid-push when
100 /// the drain workers do their final sweep — closing the race
101 /// where a producer that observed `shutdown=false` could push
102 /// *after* the drain worker's last `pop_batch_into` returned
103 /// zero, leaving the event stranded in the ring buffer.
104 /// Pre-fix this was `AtomicU32`. A 4-billion-in-flight
105 /// wrap is not realistic in production, but the counter
106 /// participates in the shutdown protocol — a wrap to 0 would
107 /// trick the wait-for-zero loop into thinking shutdown was
108 /// safe to proceed while producers were still mid-push.
109 /// Widened to `AtomicU64` so the wrap is astronomical
110 /// (1.8e19 in-flight ingests).
111 in_flight_ingests: AtomicU64,
112 /// Set to `true` after `shutdown()` runs to completion. `Drop`
113 /// uses this to detect "dropped without an awaited shutdown" —
114 /// in that case events still in the ring buffers / mpsc channels
115 /// are silently lost (see `Drop` impl).
116 shutdown_completed: AtomicBool,
117 /// Configuration.
118 config: EventBusConfig,
119 /// Statistics.
120 stats: Arc<EventBusStats>,
121 /// Producer nonce. Loaded from
122 /// `config.producer_nonce_path` on startup when the path is
123 /// configured; otherwise falls back to the per-process default
124 /// from `event::batch_process_nonce`. Stamped on every batch
125 /// the bus emits — the worker spawn copies this u64 into
126 /// `BatchWorkerParams::producer_nonce`, and
127 /// `remove_shard_internal`'s stranded-flush uses it via
128 /// `Batch::with_nonce`.
129 producer_nonce: u64,
130 /// Scaling monitor task handle.
131 scaling_monitor: parking_lot::Mutex<Option<JoinHandle<()>>>,
132}
133
134/// Worker handles for a single shard. The drain worker pumps
135/// events from the ring buffer into an mpsc channel; the batch
136/// worker reads from that channel and dispatches to the adapter.
137/// Shutdown ordering is load-bearing — see `EventBus::shutdown`.
138struct ShardWorkers {
139 batch: JoinHandle<()>,
140 drain: JoinHandle<()>,
141 /// Bus-owned mirror of the BatchWorker's `next_sequence`.
142 /// `remove_shard_internal` reads this AFTER awaiting `batch`
143 /// to learn the worker's final post-flush sequence, then uses
144 /// it as the `sequence_start` for the stranded-ring-buffer
145 /// flush so the stranded msg-ids fall strictly past every
146 /// msg-id the worker emitted — without this, JetStream dedup
147 /// would silently drop the stranded batch when both used 0.
148 next_sequence: Arc<AtomicU64>,
149}
150
151/// RAII guard for an in-flight ingest. Decrements
152/// `in_flight_ingests` on drop so `shutdown()` can wait for the
153/// counter to reach zero.
154struct IngestGuard<'a> {
155 bus: &'a EventBus,
156}
157
158impl Drop for IngestGuard<'_> {
159 fn drop(&mut self) {
160 self.bus
161 .in_flight_ingests
162 .fetch_sub(1, AtomicOrdering::SeqCst);
163 }
164}
165
166/// Event bus statistics.
167#[derive(Debug, Default)]
168pub struct EventBusStats {
169 /// Total events ingested.
170 pub events_ingested: AtomicU64,
171 /// Events dropped due to backpressure.
172 pub events_dropped: AtomicU64,
173 /// Batches dispatched to adapter.
174 ///
175 /// Pre-fix this field was declared but never incremented anywhere
176 /// — `flush()`'s Phase 2 progress probe (`bus.rs:815`) read it as
177 /// "did the BatchWorker make progress this `max_delay` window?",
178 /// observed `0 == 0`, and always early-broke after a single
179 /// window. On Windows-class timer resolution the race against the
180 /// BatchWorker's first `recv_timeout` tipped the wrong way for
181 /// `flush_is_a_delivery_barrier` regularly. Now incremented in
182 /// the BatchWorker spawn (after a successful `dispatch_batch`)
183 /// and in `remove_shard_internal`'s stranded-flush.
184 pub batches_dispatched: AtomicU64,
185 /// Total events dispatched to the adapter (sum of batch lengths
186 /// from successful `on_batch` calls). Companion to
187 /// `batches_dispatched` — by the time `flush()` returns,
188 /// `events_dispatched + events_dropped == events_ingested`. FFI
189 /// consumers can also use this to monitor end-to-end delivery.
190 pub events_dispatched: AtomicU64,
191 /// Set to `true` if `shutdown()` / `shutdown_via_ref()` had to
192 /// proceed past the in-flight-ingest grace deadline (5 s) with
193 /// producers still mid-push. The stranded count is counted into
194 /// `events_dropped` (already; pre-fix), but `shutdown` returns
195 /// `Ok(())` regardless — callers that need to distinguish
196 /// "clean shutdown" from "lossy shutdown" check this flag in
197 /// `bus.stats()` afterward (only meaningful for the
198 /// `shutdown_via_ref` path that doesn't consume the bus).
199 ///
200 /// Pre-fix the warning + `events_dropped` increment
201 /// were the only signal; `Result<(), AdapterError>` returned
202 /// `Ok` indistinguishable from a clean shutdown.
203 pub shutdown_was_lossy: std::sync::atomic::AtomicBool,
204}
205
206impl EventBus {
207 /// Create a new event bus with the given configuration.
208 pub async fn new(config: EventBusConfig) -> Result<Self, AdapterError> {
209 // Create adapter from config
210 let adapter: Box<dyn Adapter> = match &config.adapter {
211 AdapterConfig::Noop => Box::new(NoopAdapter::new()),
212 #[cfg(feature = "redis")]
213 AdapterConfig::Redis(redis_config) => {
214 Box::new(RedisAdapter::new(redis_config.clone())?)
215 }
216 #[cfg(feature = "jetstream")]
217 AdapterConfig::JetStream(js_config) => {
218 Box::new(JetStreamAdapter::new(js_config.clone())?)
219 }
220 #[cfg(feature = "net")]
221 AdapterConfig::Net(net_config) => Box::new(NetAdapter::new((**net_config).clone())?),
222 };
223
224 Self::new_with_adapter(config, adapter).await
225 }
226
227 /// Create a new event bus with a caller-supplied adapter.
228 ///
229 /// `config.adapter` is ignored — the supplied `adapter` is used
230 /// instead. Useful for tests that need to observe or inject
231 /// behavior at the adapter boundary (e.g. a counting adapter
232 /// for end-to-end delivery assertions, a flaky adapter for
233 /// retry-path coverage).
234 pub async fn new_with_adapter(
235 config: EventBusConfig,
236 mut adapter: Box<dyn Adapter>,
237 ) -> Result<Self, AdapterError> {
238 config
239 .validate()
240 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
241
242 // Initialize adapter (with timeout to prevent hanging on unreachable backends)
243 tokio::time::timeout(config.adapter_timeout, adapter.init())
244 .await
245 .map_err(|_| AdapterError::Fatal("adapter init timed out".into()))??;
246 let adapter: Arc<dyn Adapter> = Arc::from(adapter);
247
248 // Create shard manager (with or without dynamic scaling)
249 let shard_manager = if let Some(ref scaling_policy) = config.scaling {
250 Arc::new(
251 ShardManager::with_mapper(
252 config.num_shards,
253 config.ring_buffer_capacity,
254 config.backpressure_mode,
255 scaling_policy.clone(),
256 )
257 .map_err(|e| AdapterError::Fatal(e.to_string()))?,
258 )
259 } else {
260 Arc::new(ShardManager::new(
261 config.num_shards,
262 config.ring_buffer_capacity,
263 config.backpressure_mode,
264 ))
265 };
266
267 // Create poll merger.
268 //
269 // Pass the live id set rather than the count. At
270 // initial construction the ids are dense (`0..num_shards`),
271 // but using `shard_ids()` here keeps a single code path with
272 // the post-scaling re-stores below.
273 let poll_merger = arc_swap::ArcSwap::from_pointee(PollMerger::new(
274 adapter.clone(),
275 shard_manager.shard_ids(),
276 ));
277
278 // Shutdown flag and drain-finalize gate. See `drain_finalize_ready`
279 // doc on `EventBus` for the synchronization contract.
280 let shutdown = Arc::new(AtomicBool::new(false));
281 let drain_finalize_ready = Arc::new(AtomicBool::new(false));
282
283 // Stats are shared with every BatchWorker spawn so successful
284 // dispatches increment `batches_dispatched` / `events_dispatched`
285 // — `flush()`'s Phase 2 progress probe gates on those.
286 let stats = Arc::new(EventBusStats::default());
287
288 // Producer nonce. Persistent path → load-or-create
289 // the durable u64 so cross-process retries dedup against the
290 // prior incarnation. No path → per-process default (today's
291 // at-most-once-across-restart behavior).
292 let producer_nonce = match &config.producer_nonce_path {
293 Some(path) => crate::adapter::PersistentProducerNonce::load_or_create(path)
294 .map_err(|e| {
295 AdapterError::Fatal(format!(
296 "failed to load/create producer-nonce file at {}: {e}",
297 path.display(),
298 ))
299 })?
300 .nonce(),
301 None => crate::event::batch_process_nonce(),
302 };
303
304 // Create batch workers for each shard
305 let mut batch_workers: std::collections::HashMap<u16, ShardWorkers> =
306 std::collections::HashMap::with_capacity(config.num_shards as usize);
307 let mut batch_senders =
308 std::collections::HashMap::with_capacity(config.num_shards as usize);
309
310 for shard_id in 0..config.num_shards {
311 let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
312
313 let next_sequence = Arc::new(AtomicU64::new(0));
314
315 let batch = spawn_batch_worker(BatchWorkerParams {
316 shard_id,
317 rx,
318 adapter: adapter.clone(),
319 shard_manager: shard_manager.clone(),
320 config: config.batch.clone(),
321 adapter_timeout: config.adapter_timeout,
322 batch_retries: config.adapter_batch_retries,
323 next_sequence: next_sequence.clone(),
324 stats: stats.clone(),
325 producer_nonce,
326 });
327
328 let drain = spawn_drain_worker_for_shard(
329 shard_id,
330 shard_manager.clone(),
331 tx.clone(),
332 shutdown.clone(),
333 drain_finalize_ready.clone(),
334 );
335
336 batch_workers.insert(
337 shard_id,
338 ShardWorkers {
339 batch,
340 drain,
341 next_sequence,
342 },
343 );
344 batch_senders.insert(shard_id, tx);
345 }
346
347 let bus = Self {
348 shard_manager,
349 adapter,
350 poll_merger,
351 poll_merger_swap_lock: parking_lot::Mutex::new(()),
352 batch_workers: parking_lot::Mutex::new(batch_workers),
353 batch_senders: parking_lot::RwLock::new(batch_senders),
354 shutdown,
355 drain_finalize_ready,
356 in_flight_ingests: AtomicU64::new(0),
357 shutdown_completed: AtomicBool::new(false),
358 config,
359 stats,
360 producer_nonce,
361 scaling_monitor: parking_lot::Mutex::new(None),
362 };
363
364 Ok(bus)
365 }
366
367 /// Start the scaling monitor (if dynamic scaling is enabled).
368 /// This spawns a background task that periodically evaluates scaling decisions.
369 ///
370 /// The spawned task holds a `Weak<Self>` rather than a strong
371 /// `Arc<Self>` clone. With a strong clone the task kept the bus
372 /// alive forever, and `shutdown(self)` (which consumes by value)
373 /// was unreachable: callers with an `Arc<EventBus>` could not
374 /// `Arc::try_unwrap` to consume it because the spawned task always
375 /// held one of the strong refs.
376 ///
377 /// With a `Weak`, the monitor task upgrades each tick. Once the
378 /// last caller-held `Arc` is dropped, the upgrade fails and the
379 /// task exits cleanly. To shut down via `shutdown(self)`, the
380 /// caller must hold the only strong reference: `Arc::try_unwrap`
381 /// on the resulting bus succeeds because the spawned task only
382 /// holds a Weak.
383 pub fn start_scaling_monitor(self: &Arc<Self>) {
384 if self.config.scaling.is_none() {
385 return;
386 }
387
388 // Idempotency check: no-op when a monitor is already
389 // installed. Otherwise a second `start_scaling_monitor`
390 // call would overwrite the slot without aborting the
391 // previous `JoinHandle` — the displaced task would keep
392 // running, holding a `Weak<EventBus>`, only exiting when it
393 // next observed `shutdown` or failed to upgrade. Two
394 // concurrent monitors would briefly compete for
395 // `evaluate_scaling`'s lock, doubling the metrics-tick
396 // wakeup rate.
397 let mut slot = self.scaling_monitor.lock();
398 if slot.is_some() {
399 tracing::debug!("start_scaling_monitor: monitor already running, skipping");
400 return;
401 }
402
403 let weak = Arc::downgrade(self);
404 let handle = tokio::spawn(async move {
405 run_scaling_monitor_via_weak(weak).await;
406 });
407
408 *slot = Some(handle);
409 }
410
411 /// Internal: Add a new shard with its workers.
412 ///
413 /// The previous implementation called `shard_manager.add_shard()`
414 /// first, which atomically marked the shard `Active` and published
415 /// it to the routing table. So `select_shard` could route producer
416 /// pushes to the new id *before* any drain or batch worker existed,
417 /// leaving events queued in a buffer with no consumer (and
418 /// triggering the configured backpressure mode if the buffer
419 /// filled).
420 ///
421 /// The fix uses the two-phase API on `ShardManager`:
422 /// 1. `add_shard()` allocates the id and metrics collector,
423 /// adds the shard to the routing table in `Provisioning`
424 /// state — so `with_shard` works (which the drain worker
425 /// needs) but `select_shard` skips it.
426 /// 2. Spawn batch + drain workers and register the sender.
427 /// 3. `activate_shard()` flips state to `Active`. Only now
428 /// does `select_shard` start routing producer pushes.
429 async fn add_shard_internal(&self) -> Result<u16, AdapterError> {
430 self.add_shard_internal_with_cooldown_policy(false).await
431 }
432
433 /// Like [`add_shard_internal`] but bypasses the auto-scaling
434 /// cooldown. See [`ShardManager::add_shard_force`].
435 async fn add_shard_internal_force(&self) -> Result<u16, AdapterError> {
436 self.add_shard_internal_with_cooldown_policy(true).await
437 }
438
439 async fn add_shard_internal_with_cooldown_policy(
440 &self,
441 force: bool,
442 ) -> Result<u16, AdapterError> {
443 // Step 1: provisioning add — not yet selectable.
444 let new_id = if force {
445 self.shard_manager.add_shard_force()
446 } else {
447 self.shard_manager.add_shard()
448 }
449 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
450
451 // Step 2: spawn workers and register the sender.
452 let (tx, rx) = mpsc::channel::<Vec<crate::event::InternalEvent>>(1024);
453
454 let next_sequence = Arc::new(AtomicU64::new(0));
455
456 let batch = spawn_batch_worker(BatchWorkerParams {
457 shard_id: new_id,
458 rx,
459 adapter: self.adapter.clone(),
460 shard_manager: self.shard_manager.clone(),
461 config: self.config.batch.clone(),
462 adapter_timeout: self.config.adapter_timeout,
463 batch_retries: self.config.adapter_batch_retries,
464 next_sequence: next_sequence.clone(),
465 stats: self.stats.clone(),
466 producer_nonce: self.producer_nonce,
467 });
468
469 // Order of publish here matters. Today `select_shard`
470 // filters on `state == Active` and a shard is still
471 // `Provisioning` at this point, so producers cannot route
472 // to it yet — but the pattern (insert sender → spawn drain
473 // worker → activate_shard) is ordering-fragile: a future
474 // refactor that flips `activate_shard` ahead of the drain-
475 // worker spawn would leave a window where producers can
476 // push but no drain worker is polling. Pin the order by
477 // installing the sender BEFORE the drain worker (existing
478 // behaviour) so the worker observes a fully-published
479 // sender at spawn time; `activate_shard` runs further down.
480 self.batch_senders.write().insert(new_id, tx.clone());
481
482 let drain = spawn_drain_worker_for_shard(
483 new_id,
484 self.shard_manager.clone(),
485 tx,
486 self.shutdown.clone(),
487 self.drain_finalize_ready.clone(),
488 );
489
490 self.batch_workers.lock().insert(
491 new_id,
492 ShardWorkers {
493 batch,
494 drain,
495 next_sequence,
496 },
497 );
498
499 // Step 3: workers are live — flip the shard to Active so
500 // `select_shard` will route to it.
501 //
502 // On `activate_shard` failure we mirror
503 // `remove_shard_internal`'s teardown: drop the sender,
504 // unmap the provisioning entry (which atomically pops any
505 // residual ring-buffer events), gracefully await both
506 // workers (so the drain worker's `scratch` Vec sends its
507 // contents on the channel and the batch worker's
508 // `current_batch` is flushed via the channel-close path),
509 // then dispatch any stranded ring-buffer events through
510 // the adapter. Pre-fix this used `.abort()` on both
511 // handles which dropped the drain
512 // worker's scratch and the batch worker's current_batch
513 // without dispatch.
514 if let Err(e) = self.shard_manager.activate_shard(new_id) {
515 tracing::warn!(
516 shard_id = new_id,
517 error = %e,
518 "activate_shard failed; rolling back provisioning state"
519 );
520
521 // 1. Drop the bus-side sender. The drain worker still
522 // holds its own clone, so the channel stays open
523 // until `with_shard` returns None (step 2) and the
524 // drain worker breaks out of its loop, dropping
525 // its sender and finally closing the channel.
526 self.batch_senders.write().remove(&new_id);
527
528 // 2. Atomically pop any ring-buffer residue and
529 // unmap the Provisioning entry. After this,
530 // `with_shard(new_id)` returns None and the drain
531 // worker exits at its next poll (after sending
532 // any events it had already popped into `scratch`
533 // on this iteration).
534 //
535 // For a brand-new Provisioning shard the buffer
536 // should be empty (`select_shard` skips
537 // non-Active states), so `stranded` is normally
538 // `Vec::new()`. The flush below is a defensive
539 // no-op on the happy path but covers any future
540 // code path that routes to a Provisioning shard
541 // or any race window that tucked an event in
542 // before `activate_shard` returned its error.
543 let stranded = self.shard_manager.remove_shard(new_id).unwrap_or_default();
544
545 // 3. Take ownership of the worker handles and await
546 // them gracefully. Order: drain first (it pumps
547 // its scratch + final-sweep contents into the
548 // channel and exits), then batch (which receives
549 // those events plus any prior channel residue,
550 // flushes its own `current_batch`, and exits).
551 //
552 // Awaiting in this order is what makes the drain
553 // worker's scratch reach the adapter — the
554 // drain's `Some(N>0)` arm `mem::replace`s scratch
555 // into a batch and `sender.send(batch).await`s
556 // it; that send must complete (or fail) before
557 // the drain worker breaks. The batch worker's
558 // `Ok(None)` arm then runs after both senders
559 // are dropped and flushes any pending batch.
560 // Bound each JoinHandle await so a worker that's
561 // parked inside a slow adapter call (e.g. drain worker
562 // mid-`sender.send().await` against a backpressured
563 // channel because the batch worker is itself blocked
564 // in the adapter) cannot pin rollback indefinitely.
565 // 2x `adapter_timeout` is the natural ceiling: the
566 // batch worker uses `adapter_timeout` per dispatch and
567 // is expected to wake within that window. A timeout
568 // here leaks the JoinHandle — acceptable because
569 // step 1 already removed the bus-side sender, so the
570 // detached task can't observe new work and will exit
571 // on its next loop iteration.
572 let workers = self.batch_workers.lock().remove(&new_id);
573 // `worker_detached` is set when either join times out
574 // — the worker is still running on a leaked
575 // JoinHandle. In that case the `next_sequence` atomic
576 // is no longer a reliable upper bound: the detached
577 // worker may publish a final batch whose msg-ids land
578 // in the same `[next_sequence..N]` range we'd use for
579 // the stranded-flush, producing duplicate XADDs or
580 // JetStream dedup hits. Skip the stranded-flush when
581 // detached and surface the loss explicitly so the
582 // operator sees it.
583 let mut worker_detached = false;
584 let final_next_sequence = if let Some(workers) = workers {
585 let bound = self.config.adapter_timeout.saturating_mul(2);
586 match tokio::time::timeout(bound, workers.drain).await {
587 Ok(Ok(())) => {}
588 Ok(Err(err)) => {
589 tracing::warn!(
590 shard_id = new_id,
591 error = %err,
592 "drain worker JoinHandle errored on activate-failure rollback"
593 );
594 }
595 Err(_) => {
596 tracing::warn!(
597 shard_id = new_id,
598 timeout_ms = bound.as_millis() as u64,
599 "drain worker did not exit within timeout on activate-failure rollback; detaching"
600 );
601 worker_detached = true;
602 }
603 }
604 match tokio::time::timeout(bound, workers.batch).await {
605 Ok(Ok(())) => {}
606 Ok(Err(err)) => {
607 tracing::warn!(
608 shard_id = new_id,
609 error = %err,
610 "BatchWorker JoinHandle errored on activate-failure rollback"
611 );
612 }
613 Err(_) => {
614 tracing::warn!(
615 shard_id = new_id,
616 timeout_ms = bound.as_millis() as u64,
617 "BatchWorker did not exit within timeout on activate-failure rollback; detaching"
618 );
619 worker_detached = true;
620 }
621 }
622 workers.next_sequence.load(AtomicOrdering::Acquire)
623 } else {
624 0
625 };
626
627 // 4. Dispatch any stranded events as a single-shot
628 // batch so they reach durable storage with the
629 // correct sequence-id segment. Identical to the
630 // `remove_shard_internal` teardown — but only when
631 // the worker actually exited. If it timed out and
632 // is still running on a leaked handle, dispatching
633 // here would emit msg-ids overlapping the worker's
634 // final flush; surface the events as dropped
635 // instead so the duplicate-on-the-wire hazard is
636 // avoided.
637 if !stranded.is_empty() && worker_detached {
638 let count = stranded.len();
639 self.stats
640 .events_dropped
641 .fetch_add(count as u64, AtomicOrdering::Relaxed);
642 self.stats
643 .shutdown_was_lossy
644 .store(true, AtomicOrdering::Release);
645 tracing::error!(
646 shard_id = new_id,
647 count,
648 "activate-failure rollback: skipping stranded-flush \
649 because a worker JoinHandle timed out and may still \
650 be running; events would collide with the detached \
651 worker's final batch on the wire. Counted as dropped."
652 );
653 } else if !stranded.is_empty() {
654 let count = stranded.len();
655 let batch = crate::event::Batch::with_nonce(
656 new_id,
657 stranded,
658 final_next_sequence,
659 self.producer_nonce,
660 );
661 let dispatched = dispatch_batch(
662 &*self.adapter,
663 batch,
664 new_id,
665 self.config.adapter_timeout,
666 self.config.adapter_batch_retries,
667 )
668 .await;
669 if dispatched {
670 self.stats
671 .batches_dispatched
672 .fetch_add(1, AtomicOrdering::Relaxed);
673 self.stats
674 .events_dispatched
675 .fetch_add(count as u64, AtomicOrdering::Relaxed);
676 tracing::info!(
677 shard_id = new_id,
678 count,
679 "activate-failure rollback: flushed stranded events to adapter",
680 );
681 } else {
682 tracing::error!(
683 shard_id = new_id,
684 count,
685 "activate-failure rollback: adapter rejected stranded events; \
686 events lost"
687 );
688 }
689 }
690
691 return Err(AdapterError::Fatal(e.to_string()));
692 }
693
694 // Update poll merger with the post-add id set. Hold
695 // `poll_merger_swap_lock` across the snapshot-and-store so a
696 // concurrent remove_shard can't sneak between our `shard_ids()`
697 // read and our `arc_swap.store` and clobber the published view
698 // with a stale snapshot.
699 {
700 let _swap_guard = self.poll_merger_swap_lock.lock();
701 self.poll_merger.store(Arc::new(PollMerger::new(
702 self.adapter.clone(),
703 self.shard_manager.shard_ids(),
704 )));
705 }
706
707 tracing::info!(shard_id = new_id, "Added new shard");
708 Ok(new_id)
709 }
710
711 /// Internal: Remove a stopped shard.
712 ///
713 /// Previously this dropped the worker `JoinHandle`s and unmapped
714 /// the shard without first draining its ring buffer. Any events
715 /// still queued at the moment of removal — even just a few from a
716 /// producer that pushed concurrently with the scale-down decision
717 /// — were silently stranded once the drain worker exited on
718 /// `with_shard → None`.
719 ///
720 /// The fix:
721 /// 1. Wait for the drain worker we're about to retire to flush
722 /// the channel, by closing the bus-side sender first.
723 /// 2. Call `remove_shard`, which atomically pops the
724 /// ring-buffer remnants and unmaps the shard. The drained
725 /// events come back to us as a `Vec`.
726 /// 3. Hand those events directly to the adapter as a
727 /// single-shot batch — bypassing the per-shard pipeline
728 /// that's already being torn down — so they reach durable
729 /// storage.
730 async fn remove_shard_internal(&self, shard_id: u16) -> Result<(), AdapterError> {
731 // Step 1: drop the bus-side sender. The drain worker still
732 // has its own clone and will keep draining; we want it to
733 // exit when its `with_shard` call returns `None` after
734 // step 2's unmap.
735 self.batch_senders.write().remove(&shard_id);
736
737 // Step 2: atomically drain whatever's in the ring buffer and
738 // unmap. After this, `with_shard(shard_id)` returns `None`
739 // and the drain worker exits at its next poll.
740 let stranded = self
741 .shard_manager
742 .remove_shard(shard_id)
743 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
744
745 // Step 3: take ownership of the worker handles (move them
746 // OUT of the mutex map so we can `await` them — `await`
747 // consumes a `JoinHandle`). With the bus-side sender already
748 // dropped (step 1) and the shard unmapped (step 2), the
749 // drain worker exits at its next poll and drops its sender
750 // clone, which closes the BatchWorker's `rx`. The
751 // BatchWorker then flushes any pending `current_batch` and
752 // any events still buffered in the channel, dispatches them
753 // via the standard `dispatch_batch` path with their PROPER
754 // `next_sequence` values, and exits.
755 //
756 // Await order: drain first, then batch. The drain worker's
757 // `Some(N>0)` arm `mem::replace`s scratch into a batch and
758 // `sender.send(batch).await`s it; that send must complete
759 // (or fail) before drain breaks. The batch worker's
760 // `Ok(None)` arm runs after the sender drops and flushes
761 // any pending batch. Awaiting in the reverse order would
762 // park here forever: the batch worker's `recv()` only
763 // returns `None` once every sender clone (including the
764 // drain worker's) has dropped.
765 //
766 // Both awaits are bounded by `2 × adapter_timeout` so a
767 // worker parked inside a slow adapter call cannot pin
768 // teardown indefinitely. A timeout leaks the JoinHandle —
769 // acceptable because step 1 already removed the bus-side
770 // sender and step 2 unmapped the shard, so the detached
771 // task can't observe new work and will exit on its next
772 // loop iteration.
773 let workers = self.batch_workers.lock().remove(&shard_id);
774 let final_next_sequence = if let Some(workers) = workers {
775 let bound = self.config.adapter_timeout.saturating_mul(2);
776 match tokio::time::timeout(bound, workers.drain).await {
777 Ok(Ok(())) => {}
778 Ok(Err(e)) => {
779 tracing::warn!(
780 shard_id,
781 error = %e,
782 "drain worker JoinHandle errored on await; \
783 drain worker should have already exited via \
784 `with_shard -> None`",
785 );
786 }
787 Err(_) => {
788 tracing::warn!(
789 shard_id,
790 timeout_ms = bound.as_millis() as u64,
791 "drain worker did not exit within timeout on remove_shard; detaching",
792 );
793 }
794 }
795 match tokio::time::timeout(bound, workers.batch).await {
796 Ok(Ok(())) => {}
797 Ok(Err(e)) => {
798 tracing::warn!(
799 shard_id,
800 error = %e,
801 "BatchWorker JoinHandle errored on await; \
802 proceeding with stranded-flush using last \
803 published next_sequence",
804 );
805 }
806 Err(_) => {
807 tracing::warn!(
808 shard_id,
809 timeout_ms = bound.as_millis() as u64,
810 "BatchWorker did not exit within timeout on remove_shard; detaching",
811 );
812 }
813 }
814 workers.next_sequence.load(AtomicOrdering::Acquire)
815 } else {
816 // No worker registered for this shard — shouldn't
817 // happen on the normal scale-down path, but defensively
818 // fall back to 0. This branch only activates if a
819 // caller manages to call `remove_shard_internal` for a
820 // shard that was never spawned (or already removed).
821 0
822 };
823
824 // Step 4: flush the stranded ring-buffer events through the
825 // adapter in one shot, using `final_next_sequence` (NOT 0)
826 // as the `sequence_start`. The stranded batch's msg-ids
827 // are `{nonce}:{shard_id}:{final_next_sequence}:{i}` —
828 // strictly past every msg-id the worker emitted. Using 0
829 // would collide with the worker's very first batch
830 // (`{nonce}:{shard_id}:0:{i}`), and JetStream's 2 min dedup
831 // window would silently drop the duplicate.
832 if !stranded.is_empty() {
833 let count = stranded.len();
834 // Use the bus's loaded producer nonce so the stranded
835 // batch's msg-ids share the same producer-identity
836 // segment as everything else this bus has emitted —
837 // critical for cross-process dedup.
838 let batch = crate::event::Batch::with_nonce(
839 shard_id,
840 stranded,
841 final_next_sequence,
842 self.producer_nonce,
843 );
844 let dispatched = dispatch_batch(
845 &*self.adapter,
846 batch,
847 shard_id,
848 self.config.adapter_timeout,
849 self.config.adapter_batch_retries,
850 )
851 .await;
852 if dispatched {
853 self.stats
854 .batches_dispatched
855 .fetch_add(1, AtomicOrdering::Relaxed);
856 self.stats
857 .events_dispatched
858 .fetch_add(count as u64, AtomicOrdering::Relaxed);
859 tracing::info!(
860 shard_id,
861 count,
862 sequence_start = final_next_sequence,
863 "Removed shard: flushed stranded ring-buffer events to adapter"
864 );
865 } else {
866 tracing::error!(
867 shard_id,
868 count,
869 sequence_start = final_next_sequence,
870 "Removed shard: adapter rejected stranded ring-buffer events; \
871 events lost"
872 );
873 }
874 }
875
876 // Update poll merger with the post-remove id set.
877 // Without this, a default-shards poll (`request.shards == None`)
878 // would still iterate `0..num_shards` and skip the live shard
879 // whose id is now the largest, while polling a nonexistent /
880 // recreated shard at the bottom of the range.
881 //
882 // `poll_merger_swap_lock` serializes against
883 // `add_shard_internal`'s matching block — see the field doc.
884 {
885 let _swap_guard = self.poll_merger_swap_lock.lock();
886 self.poll_merger.store(Arc::new(PollMerger::new(
887 self.adapter.clone(),
888 self.shard_manager.shard_ids(),
889 )));
890 }
891
892 tracing::info!(shard_id = shard_id, "Removed shard");
893 Ok(())
894 }
895
896 /// Try to enter an ingest critical section. Returns `None` if
897 /// shutdown is in progress, in which case the caller must
898 /// return `IngestionError::ShuttingDown` without touching the
899 /// shard manager.
900 ///
901 /// The `fetch_add` + load(`shutdown`) sequence pairs with
902 /// `shutdown()`'s store(`shutdown=true`) + wait-for-zero on
903 /// `in_flight_ingests`. SeqCst on both sides closes the
904 /// stranding race: every ingest that is observed as in-flight
905 /// during shutdown's wait is guaranteed to complete before the
906 /// drain workers do their final ring-buffer sweep, so no event
907 /// can land in a ring buffer after the drain worker has stopped
908 /// reading from it.
909 #[inline(always)]
910 fn try_enter_ingest(&self) -> Option<IngestGuard<'_>> {
911 self.in_flight_ingests.fetch_add(1, AtomicOrdering::SeqCst);
912 if self.shutdown.load(AtomicOrdering::SeqCst) {
913 self.in_flight_ingests.fetch_sub(1, AtomicOrdering::SeqCst);
914 return None;
915 }
916 Some(IngestGuard { bus: self })
917 }
918
919 /// Ingest an event.
920 ///
921 /// This is a non-blocking operation. The event is added to the appropriate
922 /// shard's ring buffer and will be batched and persisted asynchronously.
923 ///
924 /// # Returns
925 ///
926 /// The shard ID and insertion timestamp on success.
927 #[inline]
928 pub fn ingest(&self, event: Event) -> IngestionResult<(u16, u64)> {
929 let _g = self
930 .try_enter_ingest()
931 .ok_or(IngestionError::ShuttingDown)?;
932
933 match self.shard_manager.ingest(event.into_inner()) {
934 Ok((shard_id, ts)) => {
935 self.stats
936 .events_ingested
937 .fetch_add(1, AtomicOrdering::Relaxed);
938 Ok((shard_id, ts))
939 }
940 Err(e) => {
941 self.stats
942 .events_dropped
943 .fetch_add(1, AtomicOrdering::Relaxed);
944 Err(e)
945 }
946 }
947 }
948
949 /// Ingest a raw event (pre-serialized with cached hash).
950 ///
951 /// This is the fastest ingestion path:
952 /// - Uses pre-computed hash for shard selection (no serialization)
953 /// - Stores bytes directly (no clone needed, reference-counted)
954 ///
955 /// # Returns
956 ///
957 /// The shard ID and insertion timestamp on success.
958 #[inline]
959 pub fn ingest_raw(&self, event: RawEvent) -> IngestionResult<(u16, u64)> {
960 let _g = self
961 .try_enter_ingest()
962 .ok_or(IngestionError::ShuttingDown)?;
963
964 match self.shard_manager.ingest_raw(event) {
965 Ok((shard_id, ts)) => {
966 self.stats
967 .events_ingested
968 .fetch_add(1, AtomicOrdering::Relaxed);
969 Ok((shard_id, ts))
970 }
971 Err(e) => {
972 self.stats
973 .events_dropped
974 .fetch_add(1, AtomicOrdering::Relaxed);
975 Err(e)
976 }
977 }
978 }
979
980 /// Ingest a batch of events.
981 ///
982 /// This is more efficient than calling `ingest` repeatedly: events
983 /// destined for the same shard share a single mutex acquisition.
984 ///
985 /// # Returns
986 ///
987 /// The number of successfully ingested events.
988 pub fn ingest_batch(&self, events: Vec<Event>) -> usize {
989 // The shutdown gate lives in `ingest_raw_batch`, which we
990 // forward to. No separate guard here — that would double-
991 // count `in_flight_ingests` (once for this call, once for
992 // the inner call) and could deadlock shutdown under high
993 // contention.
994 let raw: Vec<RawEvent> = events.into_iter().map(|e| e.into_raw()).collect();
995 self.ingest_raw_batch(raw)
996 }
997
998 /// Ingest a batch of raw events (fastest batch ingestion).
999 ///
1000 /// Groups events by their destination shard and pushes each group
1001 /// under a single mutex acquisition.
1002 ///
1003 /// # Returns
1004 ///
1005 /// The number of successfully ingested events.
1006 pub fn ingest_raw_batch(&self, events: Vec<RawEvent>) -> usize {
1007 let _g = match self.try_enter_ingest() {
1008 Some(g) => g,
1009 None => return 0,
1010 };
1011
1012 let total = events.len();
1013 let (success, unrouted) = self.shard_manager.ingest_raw_batch(events);
1014 if success > 0 {
1015 self.stats
1016 .events_ingested
1017 .fetch_add(success as u64, AtomicOrdering::Relaxed);
1018 }
1019 // Subtract `unrouted` from the buffer-fullness drop count
1020 // so the same event isn't tallied in both `events_unrouted`
1021 // (bumped inside `ShardManager::ingest_raw_batch`) and
1022 // `events_dropped` — using a plain `total - success` here
1023 // would double-count unrouted events. Backpressure drops
1024 // = events that reached a shard but failed to push.
1025 let dropped = total.saturating_sub(success).saturating_sub(unrouted);
1026 if dropped > 0 {
1027 self.stats
1028 .events_dropped
1029 .fetch_add(dropped as u64, AtomicOrdering::Relaxed);
1030 }
1031 success
1032 }
1033
1034 /// Poll events from the bus.
1035 ///
1036 /// This retrieves events from storage according to the request parameters.
1037 ///
1038 /// # Topology-change visibility
1039 ///
1040 /// `ArcSwap::load()` snapshots the current `PollMerger` for
1041 /// the duration of this call. A concurrent `add_shard` /
1042 /// `remove_shard_internal` that `.store()`s a fresh merger
1043 /// only affects **subsequent** polls — this poll continues
1044 /// against the loaded snapshot's shard list.
1045 ///
1046 /// The implications:
1047 /// - **add_shard mid-poll:** events ingested into the new
1048 /// shard between the merger swap and our return are
1049 /// invisible to this call. They appear on the next poll.
1050 /// - **remove_shard_internal mid-poll:** the stale merger
1051 /// still has the removed shard in its id list. Adapters
1052 /// that lazy-create streams on `poll_shard` (JetStream
1053 /// in particular) may recreate the stream and return
1054 /// empty/stale data. The drained events are dispatched
1055 /// to durable storage by `remove_shard_internal` itself
1056 /// before this poll's adapter call lands; the next poll
1057 /// loads the new merger and sees the correct shard set.
1058 ///
1059 /// In both cases the loss is transient and self-healing:
1060 /// pagination via `next_id` and the next poll's cursor pick
1061 /// up where we left off. Callers requiring strict
1062 /// "topology-stable" semantics should serialize their polls
1063 /// against scaling operations externally.
1064 pub async fn poll(&self, request: ConsumeRequest) -> Result<ConsumeResponse, ConsumerError> {
1065 let merger = self.poll_merger.load();
1066 merger.poll(request).await
1067 }
1068
1069 /// Get the number of shards.
1070 pub fn num_shards(&self) -> u16 {
1071 self.shard_manager.num_shards()
1072 }
1073
1074 /// Get the adapter name.
1075 pub fn adapter_name(&self) -> &'static str {
1076 self.adapter.name()
1077 }
1078
1079 /// Check if the adapter is healthy.
1080 pub async fn is_healthy(&self) -> bool {
1081 self.adapter.is_healthy().await
1082 }
1083
1084 /// Get statistics.
1085 pub fn stats(&self) -> &EventBusStats {
1086 &self.stats
1087 }
1088
1089 /// Get shard statistics.
1090 pub fn shard_stats(&self) -> crate::shard::ShardStats {
1091 self.shard_manager.stats()
1092 }
1093
1094 /// Sum of `len()` across every shard's ring buffer.
1095 ///
1096 /// Mainly useful in tests and operational diagnostics: a
1097 /// non-zero value at the time of `Drop` (without an awaited
1098 /// `shutdown()`) would be silently lost, so `Drop` folds this
1099 /// into `events_dropped` before the bus disappears.
1100 pub fn pending_in_rings(&self) -> u64 {
1101 self.shard_manager.total_pending_in_rings()
1102 }
1103
1104 /// Flush all pending batches.
1105 ///
1106 /// Waits for all shard ring buffers to drain, then for the
1107 /// per-shard mpsc channels to drain, then for any pending batch
1108 /// inside each batch worker to time out and dispatch — and only
1109 /// then calls `adapter.flush()`.
1110 ///
1111 /// # Latency bound
1112 ///
1113 /// The total wall-clock budget is the sum of three phases:
1114 /// * Phase 1 (ring-buffer drain): up to **5 s**.
1115 /// * Phase 2 (channel + pending-batch drain): up to
1116 /// `min(2 s, batch.max_delay × n_workers)` — capped at 2 s
1117 /// so a misconfigured `max_delay` cannot inflate the budget.
1118 /// * Phase 3 (`adapter.flush()` call): up to `adapter_timeout`
1119 /// (default **30 s**).
1120 ///
1121 /// Worst-case `flush()` runtime is therefore **~37 s under
1122 /// default config**, NOT 5 s as an earlier doc-comment stated.
1123 /// Callers wiring `flush()` into request-path latencies (HTTP
1124 /// handler, RPC) MUST set `adapter_timeout` accordingly or run
1125 /// the flush under their own outer timeout. The 5-second figure
1126 /// describes Phase 1 only; the doc was misleading and is fixed
1127 /// here.
1128 ///
1129 /// The previous implementation slept a single `batch.max_delay`
1130 /// (default 10 ms) after the ring buffers drained and immediately
1131 /// called `adapter.flush()`. Events still in transit through the
1132 /// per-shard mpsc channel, the batch worker's pending batch, or
1133 /// the in-progress `adapter.on_batch` call (bounded only by
1134 /// `adapter_timeout`, default 30 s) could miss the flush. Callers
1135 /// using `flush()` as a delivery barrier silently lost events.
1136 pub async fn flush(&self) -> Result<(), AdapterError> {
1137 let start = tokio::time::Instant::now();
1138 let timeout = Duration::from_secs(5);
1139 let mut backoff = Duration::from_micros(100);
1140
1141 // Phase 1: wait for ring buffers to drain (drain workers
1142 // pump them into the per-shard mpsc channels).
1143 loop {
1144 if self.shard_manager.all_shards_empty() {
1145 break;
1146 }
1147 if start.elapsed() >= timeout {
1148 tracing::warn!("flush: ring buffers not fully drained after {:?}", timeout);
1149 break;
1150 }
1151 tokio::time::sleep(backoff).await;
1152 backoff = (backoff * 2).min(Duration::from_millis(10));
1153 }
1154
1155 // Phase 2: wait until every event ingested before flush()
1156 // started has been handed to the adapter via `on_batch`.
1157 // Snapshot the `events_ingested` counter at flush entry —
1158 // that's our target. We then poll `events_dispatched` (sum
1159 // of batch lengths from successful adapter dispatches) plus
1160 // `events_dropped` (events the adapter rejected after retry
1161 // exhaustion or that never made it past backpressure). The
1162 // barrier is met when `events_dispatched + events_dropped >=
1163 // target`: every pre-flush ingest is accounted for in one
1164 // bucket or the other.
1165 //
1166 // This is a true delivery barrier — it doesn't rely on
1167 // "no progress this window" heuristics that race a
1168 // BatchWorker whose `batch_start` was set just before
1169 // flush() ran. A progress gate that reads
1170 // `batches_dispatched` only works if that counter is
1171 // actually incremented on every dispatch, and Windows
1172 // timer resolution alone has historically made any
1173 // single-`max_delay`-sleep approach a frequent flake.
1174 let target_ingested = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1175 let dropped_at_start = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1176 let dispatched_at_start = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1177
1178 // Outer deadline still bounds Phase 2 in case a wedged
1179 // adapter never returns. `max_delay * num_workers` is the
1180 // worst-case shape (one partially-filled batch per worker,
1181 // each waiting its full `max_delay` to time out), capped at
1182 // 2 s — same upper bound as before.
1183 //
1184 // Read the worker count via the shard manager's atomic-
1185 // backed `num_shards()` rather than `batch_workers.lock()
1186 // .len()`. The previous spinlock-backed `.lock()` inside
1187 // an `async fn` could stall the runtime worker thread
1188 // under contention with concurrent `add_shard_internal` /
1189 // `remove_shard_internal` callers; the atomic accessor is
1190 // both faster and async-safe. Mismatch in the
1191 // worker-count vs shard-count snapshot only changes the
1192 // phase2 deadline by at most one `max_delay` step, which
1193 // is bounded by the outer 2s cap regardless.
1194 let n_workers = usize::from(self.shard_manager.num_shards());
1195 let phase2_budget = self
1196 .config
1197 .batch
1198 .max_delay
1199 .saturating_mul(n_workers.max(1) as u32);
1200 let phase2_deadline =
1201 tokio::time::Instant::now() + phase2_budget.min(Duration::from_secs(2));
1202
1203 // Inner poll cadence: re-check the counters every 1 ms (or
1204 // `max_delay / 16`, whichever is larger). The fast cadence
1205 // means we exit promptly once the BatchWorker dispatches,
1206 // rather than waking exactly once per `max_delay` and
1207 // potentially racing the dispatch by a few ms.
1208 let poll_interval = (self.config.batch.max_delay / 16).max(Duration::from_millis(1));
1209 loop {
1210 let dispatched = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1211 let dropped = self.stats.events_dropped.load(AtomicOrdering::Acquire);
1212 // The barrier: every event ingested pre-flush has been
1213 // either dispatched or dropped. The bus's invariant
1214 // `events_ingested = events_dispatched + events_dropped`
1215 // holds at quiescence; we wait until
1216 // `dispatched + dropped >= target_ingested`.
1217 //
1218 // Pre-fix this used `dispatched + (dropped -
1219 // dropped_at_start) >= target_ingested`, which under-
1220 // counted by `dropped_at_start`: even after every
1221 // pre-flush event was processed, the inequality
1222 // required `dropped_at_start` MORE post-flush events
1223 // before signalling done. Workloads with no post-flush
1224 // ingest hung at the barrier until the deadline fired.
1225 //
1226 // Cross-shard race remains: a fast shard's post-flush
1227 // dispatches can satisfy the global target while a
1228 // slow shard's pre-flush events linger in its mpsc
1229 // channel or pending batch. `all_shards_empty()`
1230 // checks ring buffers but not those downstream
1231 // queues. Operators relying on flush as a hard
1232 // delivery barrier should call it during quiet
1233 // ingest, or in `shutdown` (which gates ingest via
1234 // `try_enter_ingest`).
1235 let _ = dispatched_at_start; // reserved for future per-shard accounting
1236 if dispatched.saturating_add(dropped) >= target_ingested
1237 && self.shard_manager.all_shards_empty()
1238 {
1239 break;
1240 }
1241 if tokio::time::Instant::now() >= phase2_deadline {
1242 tracing::warn!(
1243 target = target_ingested,
1244 dispatched,
1245 dropped = dropped.saturating_sub(dropped_at_start),
1246 "flush: Phase 2 deadline reached before all ingested events were dispatched",
1247 );
1248 break;
1249 }
1250 tokio::time::sleep(poll_interval).await;
1251 }
1252
1253 // Phase 3: tell the adapter to flush whatever it has
1254 // buffered. Bounded by `adapter_timeout` so a hanging
1255 // adapter can't pin us forever.
1256 match tokio::time::timeout(self.config.adapter_timeout, self.adapter.flush()).await {
1257 Ok(r) => r,
1258 Err(_) => {
1259 tracing::warn!(
1260 "flush: adapter.flush timed out after {:?}",
1261 self.config.adapter_timeout
1262 );
1263 Err(AdapterError::Fatal("adapter flush timed out".into()))
1264 }
1265 }
1266 }
1267
1268 /// Gracefully shut down the event bus.
1269 ///
1270 /// The shutdown order is load-bearing:
1271 ///
1272 /// 1. Signal `shutdown` so drain workers stop pulling from
1273 /// ring buffers after their final sweep.
1274 /// 2. Await **drain workers** so every event the producer
1275 /// has handed to the bus is now in the per-shard mpsc
1276 /// channel.
1277 /// 3. Drop `batch_senders` so each channel's last sender is
1278 /// gone — the next `recv().await` in a batch worker will
1279 /// return `None`.
1280 /// 4. Await **batch workers**, which drain everything
1281 /// remaining in their channel and exit on `recv() = None`.
1282 ///
1283 /// Reversing steps 2 and 4 (the previous design) silently
1284 /// dropped events: a batch worker that exited on the shutdown
1285 /// flag could leave events the drain worker pushed *after* its
1286 /// `try_recv` sweep stranded in the channel.
1287 pub async fn shutdown(self) -> Result<(), AdapterError> {
1288 self.shutdown_via_ref().await
1289 }
1290
1291 /// Shutdown via shared reference — same semantics as
1292 /// [`shutdown`](Self::shutdown), but does not consume `self`.
1293 ///
1294 /// Useful for callers that hold the bus behind `Arc<EventBus>`
1295 /// (e.g., the SDK, where `subscribe` perpetuates an Arc clone
1296 /// into every `EventStream`) and therefore cannot satisfy
1297 /// `Arc::try_unwrap`. Idempotent: the first caller does the
1298 /// work; concurrent or subsequent callers wait for the
1299 /// `shutdown_completed` flag and return `Ok(())`.
1300 pub async fn shutdown_via_ref(&self) -> Result<(), AdapterError> {
1301 // 1. CAS the shutdown flag false→true. SeqCst pairs with
1302 // `try_enter_ingest`'s shutdown check — any producer that
1303 // observed the previous `false` and is mid-push has its
1304 // `in_flight_ingests` increment ordered before this store
1305 // (the CAS-success branch is a release of the new `true`),
1306 // and so will be visible to the wait below.
1307 //
1308 // If the CAS loses (someone else — typically a concurrent
1309 // call or `Drop` — already flipped the flag), spin until
1310 // they finish. We can't run the rest of the body because
1311 // workers/senders may already be partially torn down.
1312 if self
1313 .shutdown
1314 .compare_exchange(false, true, AtomicOrdering::SeqCst, AtomicOrdering::SeqCst)
1315 .is_err()
1316 {
1317 // Bound the wait so a `Drop`-only path (which sets
1318 // `shutdown=true` but never sets `shutdown_completed`)
1319 // doesn't spin forever.
1320 //
1321 // Distinguish the two outcomes for callers. If
1322 // `shutdown_completed` flips inside the window, we
1323 // return `Ok(())` and the caller can be sure the first
1324 // caller finished. If the deadline fires first, we
1325 // surface `AdapterError::Transient(_)` — the bus IS
1326 // being shut down (the flag is set), but completion is
1327 // not yet observable; the caller can treat this as
1328 // "another thread is working on it, retry the
1329 // is_shutdown_completed() poll if you need a hard
1330 // barrier."
1331 //
1332 // Returning `Ok(())` in both branches would let
1333 // shutdown-done assumptions silently drift under a slow
1334 // adapter (`adapter_timeout` default 30 s > the 10 s
1335 // spin deadline), letting subsequent code observe a
1336 // partially-shut-down bus.
1337 // 10s in production builds; overridable via the
1338 // `_TEST_OVERRIDE_SHUTDOWN_VIA_REF_DEADLINE` thread-local
1339 // in test builds so the slow-first-caller test doesn't
1340 // need to wall-clock-wait for the full deadline. The
1341 // override is `#[cfg(test)]`-only; production cargo
1342 // builds compile out the override entirely.
1343 let deadline_dur = shutdown_via_ref_spin_deadline();
1344 // Use `tokio::time::Instant` so tests using
1345 // `tokio::time::pause()` virtualize this clock too.
1346 // Pre-fix `std::time::Instant` was wall-clock and
1347 // ignored `pause()`, breaking timeout-bounded tests
1348 // that wanted to fast-forward the spin deadline.
1349 let deadline = tokio::time::Instant::now() + deadline_dur;
1350 while !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1351 if tokio::time::Instant::now() >= deadline {
1352 return Err(AdapterError::Transient(
1353 "shutdown_via_ref: another caller is mid-shutdown; \
1354 deadline elapsed before shutdown_completed \
1355 flipped. The bus IS shutting down; poll \
1356 is_shutdown_completed() if you need a hard \
1357 barrier."
1358 .into(),
1359 ));
1360 }
1361 // `yield_now` re-queues immediately and keeps the
1362 // task hot, starving the workers we're waiting on
1363 // under contention. A short `sleep` parks the task
1364 // and lets the runtime schedule the workers.
1365 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1366 }
1367 return Ok(());
1368 }
1369
1370 // 1a. Wait for in-flight ingests to drain BEFORE the drain
1371 // workers do their final ring-buffer sweep. Otherwise a
1372 // producer that observed `shutdown=false` could push *after*
1373 // the drain worker's last `pop_batch_into` returned zero,
1374 // leaving the event stranded in the ring buffer when the bus
1375 // is dropped.
1376 //
1377 // This is bounded: every producer either bails on the
1378 // SeqCst-synchronized shutdown check (no progress past the
1379 // increment) or completes its single non-blocking push and
1380 // decrements. Both paths take constant time; the total
1381 // wait is O(producer threads).
1382 //
1383 // The "every observed in-flight ingest completes before
1384 // the final sweep" property holds under normal conditions,
1385 // but the 5-second deadline below forces the gate open
1386 // even when producers are still in their push window. A
1387 // producer that has incremented `in_flight_ingests` (and
1388 // so observed `shutdown=false`) but whose push is delayed
1389 // past the deadline (heavy contention, debugger hit, etc.)
1390 // will complete its push AFTER the final sweep — its event
1391 // lands in the ring buffer and is never read. The deadline
1392 // exists so a stuck producer can't deadlock shutdown
1393 // indefinitely; the trade-off is documented data loss past
1394 // the 5 s window, surfaced via the `events_dropped` stat
1395 // (so the loss is observable to operators) and the `WARN`
1396 // log below (so it's diagnosable). The "no stranding"
1397 // promise on the happy path stands; the deadline path is
1398 // the documented escape hatch.
1399 // Use `tokio::time::Instant` so tests using
1400 // `tokio::time::pause()` virtualize this 5-second
1401 // deadline too — pre-fix the `std::time::Instant`
1402 // was wall-clock and ignored the test's paused clock.
1403 let in_flight_deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(5);
1404 // Snapshot of `(stranded, ingested, dispatched)` at the
1405 // deadline — Some(...) iff we hit the deadline path. The
1406 // post-drain reconciliation reads this to compute the
1407 // actual drop count (see comment further down).
1408 let mut deadline_snapshot: Option<(u64, u64, u64)> = None;
1409 while self.in_flight_ingests.load(AtomicOrdering::SeqCst) > 0 {
1410 if tokio::time::Instant::now() >= in_flight_deadline {
1411 let stranded = self.in_flight_ingests.load(AtomicOrdering::SeqCst);
1412 let ingested_now = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1413 let dispatched_now = self.stats.events_dispatched.load(AtomicOrdering::Acquire);
1414 tracing::warn!(
1415 in_flight = stranded,
1416 lossy = true,
1417 "shutdown timed out waiting for in-flight ingests after 5s; \
1418 proceeding — up to {} events may strand in the ring buffer \
1419 past final drain (documented data-loss path)",
1420 stranded,
1421 );
1422 // Set the lossy flag immediately so a fast `is_*`
1423 // poll observes the outcome before the drain
1424 // finishes. The actual `events_dropped` bump is
1425 // deferred until after the final drain runs (see
1426 // "post-drain reconciliation" below) so we don't
1427 // double-count events that the drain still
1428 // successfully delivers — pre-fix this bumped
1429 // `events_dropped += stranded` here and the same
1430 // events that the final sweep then drained landed
1431 // in BOTH `events_ingested` and `events_dropped`,
1432 // breaking the bus's
1433 // `ingested == dispatched + dropped` invariant
1434 // and turning `shutdown_was_lossy` into a false
1435 // positive on every deadline-triggered shutdown.
1436 self.stats
1437 .shutdown_was_lossy
1438 .store(true, AtomicOrdering::Release);
1439 deadline_snapshot = Some((stranded, ingested_now, dispatched_now));
1440 break;
1441 }
1442 // Park instead of `yield_now`. The producers we're
1443 // waiting on contend for the same runtime threads;
1444 // re-queuing immediately starves their progress.
1445 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
1446 }
1447
1448 // 1b. Release the drain-finalize gate.
1449 //
1450 // Pre-fix used `Ordering::Release` for this store
1451 // and relied on the SeqCst spin above (loading
1452 // `in_flight_ingests`) to provide the happens-before for
1453 // every observed-pre-shutdown push. That works today
1454 // because SeqCst loads carry an implicit fence, but a
1455 // future change to the spin's ordering (Relaxed for perf,
1456 // say) would silently break the drain worker's final-sweep
1457 // contract — producer pushes might not be visible to
1458 // `pop_batch_into`. Promote to SeqCst so the load-bearing
1459 // happens-before is explicit at this site, not derived
1460 // from another atomic's ordering choice.
1461 //
1462 // Caveat: the SeqCst happens-before above only covers the
1463 // *non-deadline* exit from the spin (the loop condition
1464 // observed `in_flight_ingests == 0`). On the deadline-break
1465 // path the loop exits with `in_flight_ingests > 0` — the
1466 // outstanding producer pushes have NOT been observed
1467 // synchronized-with this thread, and the SeqCst store below
1468 // does not retroactively create a happens-before with
1469 // pushes that are still mid-flight. Those events are
1470 // exactly the "stranded" events accounted via
1471 // `events_dropped` and `shutdown_was_lossy`; the contract
1472 // is "every observed-pre-shutdown push is visible to the
1473 // final sweep on the happy path; on the deadline path,
1474 // up to `stranded` events past the gate are surfaced as
1475 // dropped". The flag at line 1271 (`shutdown_was_lossy`)
1476 // is the operator-visible signal that this contract was
1477 // exercised on the lossy branch.
1478 self.drain_finalize_ready
1479 .store(true, AtomicOrdering::SeqCst);
1480
1481 // Stop the scaling monitor first — it's independent of the
1482 // ingestion path and just needs to observe the flag.
1483 let scaling_handle = self.scaling_monitor.lock().take();
1484 if let Some(handle) = scaling_handle {
1485 let _ = handle.await;
1486 }
1487
1488 // Take workers without holding the lock across await.
1489 let workers = std::mem::take(&mut *self.batch_workers.lock());
1490
1491 // 2. Await drain workers. Each one pops a final batch (up
1492 // to 10k events) from its ring buffer, sends it on the
1493 // channel, and exits. After this loop, every event in
1494 // the ring buffers has been pushed to its channel.
1495 //
1496 // `join_all` lets the runtime overlap drain handles. A
1497 // sequential `for ... { drain.await; }` would serialize
1498 // shutdown wall-clock as N×T instead of max(T), which on
1499 // the default 1024-shard config × per-shard final-drain
1500 // time makes shutdown painful.
1501 let (drains, batch_handles): (Vec<_>, Vec<_>) = workers
1502 .into_iter()
1503 .map(|(shard_id, ShardWorkers { batch, drain, .. })| {
1504 ((shard_id, drain), (shard_id, batch))
1505 })
1506 .unzip();
1507
1508 // Surface drain-worker JoinErrors explicitly. The default
1509 // Tokio runtime does NOT log spawned-task panics, so a
1510 // `let _ = join_all(...)` would silently swallow a panic
1511 // and mask stranded events. tracing::error per failure
1512 // makes the incident grep-able post-mortem.
1513 let drain_handles: Vec<_> = drains.into_iter().map(|(_, h)| h).collect();
1514 let drain_ids: Vec<u16> = batch_handles.iter().map(|(id, _)| *id).collect();
1515 for (shard_id, result) in drain_ids
1516 .iter()
1517 .copied()
1518 .zip(futures::future::join_all(drain_handles).await)
1519 {
1520 if let Err(e) = result {
1521 tracing::error!(
1522 shard_id,
1523 error = %e,
1524 "drain worker JoinHandle errored on shutdown await"
1525 );
1526 }
1527 }
1528
1529 // 3. Drop the original senders so the channels close once
1530 // drain-worker sender clones (already dropped above)
1531 // are gone too. Without this, batch workers would block
1532 // on `recv().await` forever.
1533 drop(std::mem::take(&mut *self.batch_senders.write()));
1534
1535 // 4. Await batch workers. They drain their channel until
1536 // `recv() = None`, flush, and exit.
1537 //
1538 // Same parallelization as the drain phase, with the same
1539 // explicit JoinError surfacing.
1540 let batch_only: Vec<_> = batch_handles.into_iter().map(|(_, h)| h).collect();
1541 for (shard_id, result) in drain_ids
1542 .into_iter()
1543 .zip(futures::future::join_all(batch_only).await)
1544 {
1545 if let Err(e) = result {
1546 tracing::error!(
1547 shard_id,
1548 error = %e,
1549 "BatchWorker JoinHandle errored on shutdown await"
1550 );
1551 }
1552 }
1553
1554 // Flush and shutdown adapter (with timeout to prevent hanging)
1555 let timeout = self.config.adapter_timeout;
1556 if tokio::time::timeout(timeout, self.adapter.flush())
1557 .await
1558 .is_err()
1559 {
1560 tracing::error!("Adapter flush timed out during shutdown");
1561 }
1562 let result = tokio::time::timeout(timeout, self.adapter.shutdown())
1563 .await
1564 .map_err(|_| AdapterError::Fatal("adapter shutdown timed out".into()))?;
1565
1566 // Post-drain reconciliation for the lossy-shutdown path.
1567 //
1568 // If we hit the in-flight deadline above, `deadline_snapshot`
1569 // holds `(stranded, ingested@deadline, dispatched@deadline)`.
1570 // Some of those `stranded` producers' events landed in the
1571 // ring AFTER our deadline check but BEFORE the
1572 // `drain_finalize_ready` gate flipped — those events are
1573 // now successfully ingested, drained, and dispatched through
1574 // the adapter. They appear in `events_dispatched` (the
1575 // delta since the deadline), so:
1576 //
1577 // actual_drops = stranded
1578 // - (dispatched_after_drain - dispatched@deadline)
1579 // - (ingested_after_drain - ingested@deadline)
1580 // only counting events that landed but
1581 // weren't dispatched (dropped under
1582 // backpressure, etc.)
1583 //
1584 // The cleaner reconciliation: events that completed
1585 // `try_enter_ingest` AFTER the deadline either completed
1586 // ingest (bumping `events_ingested`) or were dropped on
1587 // backpressure (bumping `events_dropped` from the existing
1588 // backpressure paths). The `stranded - delta_ingested`
1589 // remainder is producers whose `try_enter_ingest` succeeded
1590 // but never reached `shard_manager.ingest()` — those are
1591 // the genuinely-lost events we should account for.
1592 if let Some((stranded, ingested_at_deadline, _dispatched_at_deadline)) = deadline_snapshot {
1593 let ingested_after = self.stats.events_ingested.load(AtomicOrdering::Acquire);
1594 let post_deadline_ingests = ingested_after.saturating_sub(ingested_at_deadline);
1595 // Known under-count window: a producer that completed
1596 // `shard_manager.ingest()` and bumped `events_ingested`
1597 // just BEFORE its `IngestGuard` decremented
1598 // `in_flight_ingests` will be counted in
1599 // `ingested_at_deadline` (already past the bump) AND in
1600 // `stranded` (still pending the decrement on that same
1601 // spin). That single event contributes `+1` to both
1602 // sides of the subtraction, so the saturating_sub
1603 // under-counts the drop by 1 per such interleaving.
1604 // The opposite direction (producer in-flight at
1605 // deadline that never completed ingest) is correctly
1606 // counted as a drop, so the bias is one-sided and
1607 // small (bounded by the number of producers mid-push
1608 // at the exact deadline moment — typically 0–1 even
1609 // under heavy load). Operators reading
1610 // `shutdown_was_lossy` get the right boolean; the
1611 // numeric `events_dropped` may under-count by a few
1612 // events on a lossy shutdown. Acceptable; the cost of
1613 // closing the window is paired-SeqCst reloads under
1614 // the spin which would dominate the shutdown path.
1615 let actual_drops = stranded.saturating_sub(post_deadline_ingests);
1616 if actual_drops > 0 {
1617 self.stats
1618 .events_dropped
1619 .fetch_add(actual_drops, AtomicOrdering::Relaxed);
1620 } else {
1621 // The deadline tripped the eager
1622 // `shutdown_was_lossy = true` set above, but the
1623 // final drain ingested every stranded event so
1624 // nothing was actually lost. Clear the flag so
1625 // operator dashboards alerting on
1626 // `was_lossy && events_dropped == 0` don't see a
1627 // false positive. Pre-fix the boolean stayed
1628 // `true` against `events_dropped == 0` for any
1629 // deadline-triggered shutdown whose drain
1630 // happened to catch up.
1631 self.stats
1632 .shutdown_was_lossy
1633 .store(false, AtomicOrdering::Release);
1634 }
1635 tracing::warn!(
1636 stranded_at_deadline = stranded,
1637 post_deadline_ingests,
1638 actual_drops,
1639 "lossy shutdown reconciled: post-drain `events_dropped` bumped \
1640 by stranded - post-deadline-ingests (pre-fix this bumped by \
1641 the full `stranded` count, double-counting events the drain \
1642 still successfully delivered)",
1643 );
1644 }
1645
1646 // Mark shutdown as completed so Drop knows not to warn.
1647 self.shutdown_completed.store(true, AtomicOrdering::Release);
1648 result
1649 }
1650
1651 /// True once `shutdown` / `shutdown_via_ref` has signaled — does
1652 /// not imply the shutdown work has finished. Use
1653 /// [`is_shutdown_completed`](Self::is_shutdown_completed) for
1654 /// completion.
1655 pub fn is_shutdown(&self) -> bool {
1656 self.shutdown.load(AtomicOrdering::Acquire)
1657 }
1658
1659 /// True once `shutdown` / `shutdown_via_ref` has fully drained
1660 /// workers and the adapter shutdown returned (success path only).
1661 pub fn is_shutdown_completed(&self) -> bool {
1662 self.shutdown_completed.load(AtomicOrdering::Acquire)
1663 }
1664
1665 /// Get shard metrics (if dynamic scaling is enabled).
1666 pub fn shard_metrics(&self) -> Option<Vec<ShardMetrics>> {
1667 self.shard_manager.collect_metrics()
1668 }
1669
1670 /// Check if dynamic scaling is enabled.
1671 pub fn is_dynamic_scaling_enabled(&self) -> bool {
1672 self.config.scaling.is_some()
1673 }
1674
1675 /// Manually trigger a scale-up (for testing or manual intervention).
1676 ///
1677 /// Bypasses the auto-scaling cooldown so a deliberate operator
1678 /// request isn't rate-limited by the auto-scaling cadence.
1679 /// Pre-fix this looped `add_shard_internal()` N times, each
1680 /// of which bumped `last_scaling`, so iteration 1+ failed
1681 /// with `InCooldown` against any non-zero cooldown — the
1682 /// first shard was left half-added (workers spawned, routing
1683 /// entry installed) while the error propagated to the
1684 /// caller. The `max_shards` budget check still applies.
1685 pub async fn manual_scale_up(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1686 let mut new_ids = Vec::with_capacity(count as usize);
1687 for _ in 0..count {
1688 let id = self.add_shard_internal_force().await?;
1689 new_ids.push(id);
1690 }
1691 Ok(new_ids)
1692 }
1693
1694 /// Manually trigger a scale-down (for testing or manual intervention).
1695 ///
1696 /// Marks `count` shards as `Draining`, waits for them to empty,
1697 /// finalizes them to `Stopped`, and removes them from the
1698 /// routing table — mirroring the scaling monitor's per-tick
1699 /// finalize loop. Returns the IDs of shards that were
1700 /// successfully drained AND removed (subset of those marked
1701 /// Draining if any failed to empty within the deadline).
1702 ///
1703 /// Drives the full scale-down lifecycle synchronously: a
1704 /// plain `mapper.scale_down` call marks shards `Draining` but
1705 /// does NOT finalize them — finalization is the scaling
1706 /// monitor's responsibility. Bus configs without an active
1707 /// monitor (or callers that shut down before the monitor's
1708 /// next tick) would otherwise lose any events queued in those
1709 /// shards' ring buffers.
1710 pub async fn manual_scale_down(&self, count: u16) -> Result<Vec<u16>, AdapterError> {
1711 let mapper = self
1712 .shard_manager
1713 .mapper()
1714 .ok_or_else(|| AdapterError::Fatal("Dynamic scaling not enabled".into()))?;
1715
1716 let drained_ids = mapper
1717 .scale_down(count)
1718 .map_err(|e| AdapterError::Fatal(e.to_string()))?;
1719
1720 // `finalize_draining` requires the shard to have been
1721 // Draining for >100ms with an empty ring buffer and no
1722 // pushes since drain start. Poll until every requested
1723 // shard finalizes, capped by an outer deadline so a wedged
1724 // producer can't pin this method forever. Use
1725 // `tokio::time::Instant` so tests under `tokio::time::pause()`
1726 // advance the virtual clock via `sleep` rather than spinning
1727 // until wall-clock catches up.
1728 let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(2);
1729 let mut finalized: std::collections::HashSet<u16> = std::collections::HashSet::new();
1730 let target: std::collections::HashSet<u16> = drained_ids.iter().copied().collect();
1731
1732 while finalized.len() < target.len() && tokio::time::Instant::now() < deadline {
1733 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1734 let stopped = mapper.finalize_draining();
1735 // `finalize_draining` is destructive — every qualifying
1736 // Draining shard transitions to Stopped in one shot,
1737 // regardless of who initiated the drain. Pre-fix the
1738 // `if target.contains(&shard_id)` filter dropped non-
1739 // target ids on the floor; if the scaling monitor (or
1740 // a parallel `manual_scale_down` on a different target
1741 // set) finalized one of THEIR shards in the same tick,
1742 // that shard ended up Stopped with workers + routing
1743 // entry intact — leaked. Always tear down via
1744 // `remove_shard_internal` so the bus-side state
1745 // (workers, sender, routing) is freed; only count the
1746 // target subset toward the returned Vec.
1747 for shard_id in stopped {
1748 let _ = self.remove_shard_internal(shard_id).await;
1749 if target.contains(&shard_id) {
1750 finalized.insert(shard_id);
1751 }
1752 }
1753 }
1754
1755 // Surface partial success at WARN level. The return shape
1756 // is preserved for compat (changing to `(Vec, Vec)` would
1757 // break the existing callers + test) so a smaller-than-
1758 // target list could otherwise be silently mistaken for full
1759 // success; the WARN log gives operations tooling something
1760 // to alert on.
1761 if finalized.len() < target.len() {
1762 let still_draining: Vec<u16> = target.difference(&finalized).copied().collect();
1763 tracing::warn!(
1764 requested = target.len(),
1765 finalized = finalized.len(),
1766 still_draining = ?still_draining,
1767 "manual_scale_down deadline elapsed before all targeted \
1768 shards finalized — events still in-flight on the listed \
1769 shards. They will finalize on the next scaling-monitor \
1770 tick or on shutdown."
1771 );
1772 }
1773
1774 Ok(finalized.into_iter().collect())
1775 }
1776}
1777
1778impl Drop for EventBus {
1779 fn drop(&mut self) {
1780 // Signal shutdown so background tasks (drain workers, batch
1781 // workers, scaling monitor) observe the flag and exit. We
1782 // cannot await futures in Drop, but setting the atomic flag
1783 // triggers eventual termination.
1784 //
1785 // Previously used `Release` here while `try_enter_ingest`
1786 // and `shutdown()` use `SeqCst` on the same flag. `&mut self`
1787 // exclusion makes that sound today (no concurrent ingest can
1788 // observe a half-published shutdown). The mismatch is purely
1789 // defensive — switching to `SeqCst` matches the rest of the
1790 // lifecycle and removes a footgun if a future change ever
1791 // opened a path where `Drop` overlaps an in-flight
1792 // `try_enter_ingest`.
1793 self.shutdown.store(true, AtomicOrdering::SeqCst);
1794 // Also release the drain-finalize gate so any drain worker
1795 // already parked waiting for it can proceed and exit. Without
1796 // this, drop-without-shutdown leaves drain workers blocked on
1797 // `drain_finalize_ready` until their internal deadline fires
1798 // (which delays task cleanup by `DRAIN_FINALIZE_TIMEOUT`).
1799 // Best-effort durability: drop never gets the in-flight wait,
1800 // so any push that lands after this point is still lost.
1801 self.drain_finalize_ready
1802 .store(true, AtomicOrdering::SeqCst);
1803
1804 // Workers do NOT hold `Arc<EventBus>` — they hold
1805 // independent `Arc<ShardManager>` / `Arc<dyn Adapter>`
1806 // clones plus the channel halves. When `Drop` returns,
1807 // those Arcs survive in the still-running tasks and they
1808 // continue draining / dispatching until they observe the
1809 // shutdown flag we just set. There's no partial-Drop UB
1810 // risk: nothing on the worker side dereferences a
1811 // dropped EventBus field. The flags promote the
1812 // worker tasks from "blocked on recv / parked on
1813 // drain_finalize_ready" to "observe shutdown=true and
1814 // exit" so they don't linger indefinitely.
1815 //
1816 // If `shutdown()` was never awaited, any events still in the
1817 // per-shard ring buffers or mpsc channels are lost — the
1818 // adapter's `flush()` and `shutdown()` won't run, so durable
1819 // backends never see them. We can't fix that from `Drop` (no
1820 // async), but we *can* surface the data-loss risk loudly so
1821 // it doesn't hide. The check is bounded to "shutdown was
1822 // never started"; an in-progress shutdown is fine because the
1823 // call site is awaiting it.
1824 if !self.shutdown_completed.load(AtomicOrdering::Acquire) {
1825 // Count events still sitting in shard ring buffers. They
1826 // are stranded — the drain workers will see `shutdown =
1827 // true` and exit without flushing, the adapter's
1828 // `flush()`/`shutdown()` never run, so anything in the
1829 // rings at this point is permanently lost. Surface that
1830 // loss via `events_dropped` so post-mortem stats reflect
1831 // reality (operators alerting on `events_dropped > 0`
1832 // would otherwise miss the entire incident), and set
1833 // `shutdown_was_lossy` so the boolean view is consistent
1834 // with the counter view.
1835 //
1836 // Events in the BatchWorker mpsc channels or pending
1837 // batches are not counted here — those workers may still
1838 // observe the shutdown flag and exit, but we have no
1839 // synchronous way from Drop to enumerate them. The ring-
1840 // buffer count is a lower bound on the stranded total.
1841 //
1842 // Use the non-blocking accessor: if `Drop` runs on a
1843 // thread that already holds a shard mutex (single-thread
1844 // runtime + panic during shutdown is the canonical
1845 // hazard), the blocking `total_pending_in_rings` would
1846 // self-deadlock. Best-effort accounting is the right
1847 // trade-off here — we'd rather under-report stranded
1848 // events than wedge the drop forever.
1849 let (stranded_in_rings, uncounted_shards) =
1850 self.shard_manager.try_total_pending_in_rings();
1851 if uncounted_shards > 0 {
1852 tracing::warn!(
1853 uncounted_shards,
1854 "EventBus::drop: {uncounted_shards} shard(s) were locked at \
1855 drop time and could not be accounted for in stranded_in_rings"
1856 );
1857 }
1858 if stranded_in_rings > 0 {
1859 self.stats
1860 .events_dropped
1861 .fetch_add(stranded_in_rings, AtomicOrdering::Relaxed);
1862 self.stats
1863 .shutdown_was_lossy
1864 .store(true, AtomicOrdering::Release);
1865 }
1866
1867 let stats = self.shard_manager.stats();
1868 tracing::warn!(
1869 events_ingested = stats.events_ingested,
1870 events_dropped = stats.events_dropped,
1871 stranded_in_rings,
1872 "EventBus dropped without an awaited shutdown(). Any in-flight \
1873 events still in the ring buffers or batch channels will be lost \
1874 — the adapter's flush()/shutdown() never ran. Call \
1875 `bus.shutdown().await` before dropping for durable shutdown."
1876 );
1877 }
1878 }
1879}
1880
1881/// Spin deadline for the second-caller path in
1882/// `shutdown_via_ref`. 10s in production.
1883#[cfg(not(test))]
1884fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1885 std::time::Duration::from_secs(10)
1886}
1887
1888/// Test-only override. Stored as milliseconds; `0` (the default)
1889/// means "use the production 10s". Set via
1890/// [`set_shutdown_via_ref_spin_deadline_for_test`] from inside a
1891/// test that needs to exercise the deadline-elapsed path without
1892/// wall-clock-waiting the full 10s.
1893///
1894/// This is a global atomic shared across all tests in the
1895/// `cargo test --lib` binary. If two tests touched it concurrently,
1896/// one's override would leak into the other's expectations. Tests
1897/// that use the override MUST take the
1898/// [`SHUTDOWN_DEADLINE_OVERRIDE_GUARD`] mutex for the duration of
1899/// their override-setter / read window — see
1900/// [`set_shutdown_via_ref_spin_deadline_for_test`].
1901#[cfg(test)]
1902static SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS: std::sync::atomic::AtomicU64 =
1903 std::sync::atomic::AtomicU64::new(0);
1904
1905/// Serializes access to the deadline override so concurrent tests
1906/// can't observe each other's transient values. Tests that override
1907/// the deadline take this mutex via
1908/// [`shutdown_deadline_override_lock`] and hold the guard until
1909/// they reset the override to 0.
1910///
1911/// Uses `tokio::sync::Mutex` rather than `std::sync::Mutex` so
1912/// the guard can legitimately be held across `.await` points
1913/// while the guarded test runs (clippy::await_holding_lock would
1914/// otherwise fire on the std variant — and rightly so for
1915/// production code).
1916#[cfg(test)]
1917static SHUTDOWN_DEADLINE_OVERRIDE_GUARD: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
1918
1919/// Acquire the guard mutex protecting the deadline-override
1920/// static. Tests touching the override hold the returned guard
1921/// across both the set and the reset, so concurrent tests
1922/// observe a consistent default.
1923#[cfg(test)]
1924pub(crate) async fn shutdown_deadline_override_lock() -> tokio::sync::MutexGuard<'static, ()> {
1925 SHUTDOWN_DEADLINE_OVERRIDE_GUARD.lock().await
1926}
1927
1928#[cfg(test)]
1929fn shutdown_via_ref_spin_deadline() -> std::time::Duration {
1930 let ms = SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.load(std::sync::atomic::Ordering::Relaxed);
1931 if ms == 0 {
1932 std::time::Duration::from_secs(10)
1933 } else {
1934 std::time::Duration::from_millis(ms)
1935 }
1936}
1937
1938#[cfg(test)]
1939pub(crate) fn set_shutdown_via_ref_spin_deadline_for_test(ms: u64) {
1940 SHUTDOWN_VIA_REF_DEADLINE_OVERRIDE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
1941}
1942
1943async fn run_scaling_monitor_via_weak(weak: std::sync::Weak<EventBus>) {
1944 // Refresh `interval` from the policy on every tick. The previous
1945 // version cached it once at task start, so any future runtime
1946 // policy update would not be adopted by the monitor without a
1947 // process restart. Today `EventBusConfig` is immutable
1948 // post-construction so this is a no-op — but reading it each tick
1949 // is cheap (one atomic / RwLock read) and removes the latent
1950 // footgun.
1951 loop {
1952 let interval = match weak.upgrade() {
1953 Some(bus) => match &bus.config.scaling {
1954 Some(p) => p.metrics_window,
1955 None => return,
1956 },
1957 None => return,
1958 };
1959 tokio::time::sleep(interval).await;
1960
1961 let bus = match weak.upgrade() {
1962 Some(b) => b,
1963 // Last strong ref dropped — caller is shutting down (or
1964 // already gone). Exit cleanly.
1965 None => break,
1966 };
1967
1968 // SeqCst to match the writer side (`EventBus::shutdown` /
1969 // `Drop`). The Acquire/Release handshake on
1970 // `drain_finalize_ready` already provides the load-bearing
1971 // happens-before today — but a future code change that
1972 // piggybacks on `shutdown`'s ordering (e.g. a producer that
1973 // observes shutdown without going through
1974 // `try_enter_ingest`) would silently break under Relaxed.
1975 // Aligning the read-side ordering with the writer-side
1976 // SeqCst is a one-instruction tax for the safety.
1977 if bus.shutdown.load(AtomicOrdering::SeqCst) {
1978 break;
1979 }
1980
1981 // Collect metrics for observability.
1982 if let Some(metrics) = bus.shard_manager.collect_metrics() {
1983 for m in &metrics {
1984 if m.fill_ratio > 0.5 {
1985 tracing::debug!(
1986 shard_id = m.shard_id,
1987 fill_ratio = m.fill_ratio,
1988 event_rate = m.event_rate,
1989 "Shard metrics"
1990 );
1991 }
1992 }
1993 }
1994
1995 // Evaluate scaling.
1996 match bus.shard_manager.evaluate_scaling() {
1997 ScalingDecision::ScaleUp(count) => {
1998 tracing::info!(count = count, "Scaling up shards");
1999 for _ in 0..count {
2000 if let Err(e) = bus.add_shard_internal().await {
2001 tracing::error!(error = %e, "Failed to add shard");
2002 break;
2003 }
2004 }
2005 }
2006 ScalingDecision::ScaleDown(count) => {
2007 tracing::info!(count = count, "Scaling down shards");
2008 if let Some(mapper) = bus.shard_manager.mapper() {
2009 if let Ok(drained) = mapper.scale_down(count) {
2010 for shard_id in drained {
2011 let _ = bus.shard_manager.drain_shard(shard_id);
2012 }
2013 }
2014 }
2015 }
2016 ScalingDecision::None => {}
2017 }
2018
2019 if let Some(mapper) = bus.shard_manager.mapper() {
2020 let stopped = mapper.finalize_draining();
2021 for shard_id in stopped {
2022 let _ = bus.remove_shard_internal(shard_id).await;
2023 }
2024 }
2025
2026 // CRITICAL: drop the strong ref BEFORE the next sleep so a
2027 // concurrent `shutdown(self)` caller can `Arc::try_unwrap`
2028 // the last strong ref while we're sleeping.
2029 drop(bus);
2030 }
2031}
2032
2033/// Spawn a batch worker for a shard.
2034/// Dispatch a batch to the adapter with timeout and optional retries.
2035/// Returns true if the batch was accepted, false if all attempts failed.
2036///
2037/// Non-retryable errors (e.g. `AdapterError::Connection`,
2038/// `AdapterError::Fatal`, `AdapterError::Serialization`) skip the
2039/// retry loop and drop the batch immediately. Retrying a fatal error
2040/// just delays the inevitable while burning CPU and amplifying log
2041/// noise. Use `AdapterError::is_retryable` as the single source of
2042/// truth for this decision.
2043/// Compute the per-attempt backoff for `dispatch_batch` retries.
2044///
2045/// Pre-fix the retry loop slept a flat `Duration::from_millis(100)`
2046/// after every failure. Under a partial backend outage (Redis /
2047/// JetStream slow but not dead), every shard's BatchWorker retried
2048/// on the exact same 100 ms cadence, producing a synchronized
2049/// retry storm that amplified load while the backend was
2050/// recovering.
2051///
2052/// Post-fix: exponential backoff (100, 200, 400, 800, 1600, 3200 ms)
2053/// with per-(shard, attempt) jitter to decorrelate retries across
2054/// shards. Capped at attempt=5 (3.2 s base) so retries don't grow
2055/// unboundedly. Jitter is `[-25%, +25%]` of the base, derived from
2056/// hashing `(shard_id, attempt)` — deterministic per shard but
2057/// uncorrelated across shards, which is exactly what the storm
2058/// mitigation needs.
2059fn retry_backoff(shard_id: u16, attempt: u32) -> Duration {
2060 use std::collections::hash_map::DefaultHasher;
2061 use std::hash::{Hash, Hasher};
2062
2063 // 100 ms × 2^attempt, capped at attempt=5 → 100/200/400/800/1600/3200.
2064 let base_ms: u64 = 100u64.saturating_mul(1u64 << attempt.min(5));
2065
2066 let mut hasher = DefaultHasher::new();
2067 shard_id.hash(&mut hasher);
2068 attempt.hash(&mut hasher);
2069 let h = hasher.finish();
2070 // Jitter in [0, base_ms/2), centered to give [-25%, +25%].
2071 let jitter_range = (base_ms / 2).max(1);
2072 let jitter = (h % jitter_range) as i64 - (jitter_range as i64 / 2);
2073 let final_ms = (base_ms as i64 + jitter).max(1) as u64;
2074
2075 Duration::from_millis(final_ms)
2076}
2077
2078async fn dispatch_batch(
2079 adapter: &dyn Adapter,
2080 batch: Batch,
2081 shard_id: u16,
2082 timeout: Duration,
2083 retries: u32,
2084) -> bool {
2085 // Retry attempts clone the batch; the final attempt moves it, saving
2086 // one clone per dispatch (the common path is retries == 0).
2087 for attempt in 0..retries {
2088 match tokio::time::timeout(timeout, adapter.on_batch(batch.clone())).await {
2089 Ok(Ok(())) => return true,
2090 Ok(Err(e)) => {
2091 if !e.is_retryable() {
2092 // Tag with a `reason` field so this
2093 // distinct drop cause is separately filterable
2094 // from retry-exhausted and timeout in
2095 // observability tools. Shutdown is distinguished
2096 // from generic non-retryable so an operator
2097 // chasing "why are batches being dropped" can
2098 // immediately tell a sequencing bug (sending to
2099 // a stopped adapter) from a transport / config
2100 // failure.
2101 let reason = if e.is_shutdown() {
2102 "adapter_shutdown"
2103 } else {
2104 "non_retryable"
2105 };
2106 tracing::error!(
2107 shard_id,
2108 error = %e,
2109 attempt,
2110 reason,
2111 "Non-retryable error from adapter, dropping batch"
2112 );
2113 return false;
2114 }
2115 tracing::warn!(shard_id, error = %e, attempt, "Batch dispatch failed, retrying");
2116 }
2117 Err(_) => {
2118 tracing::warn!(shard_id, attempt, "Adapter on_batch timed out, retrying");
2119 }
2120 }
2121 tokio::time::sleep(retry_backoff(shard_id, attempt)).await;
2122 }
2123
2124 // Pre-fix the final attempt collapsed every drop into
2125 // one log line ("Failed to dispatch batch, dropping"), making
2126 // it impossible to tell retry-exhausted from fatal-non-
2127 // retryable from timeout in metrics. The non-retryable case
2128 // already has its own log inside the retry loop above (early
2129 // return); here we tag retry-exhausted vs timeout-after-
2130 // retries with distinct `reason` fields so log-based
2131 // observability tools can break the drops out by cause.
2132 match tokio::time::timeout(timeout, adapter.on_batch(batch)).await {
2133 Ok(Ok(())) => true,
2134 Ok(Err(e)) => {
2135 tracing::error!(
2136 shard_id,
2137 error = %e,
2138 reason = "retry_exhausted",
2139 attempts = retries + 1,
2140 "Failed to dispatch batch after exhausting retries, dropping"
2141 );
2142 false
2143 }
2144 Err(_) => {
2145 tracing::error!(
2146 shard_id,
2147 reason = "timeout",
2148 attempts = retries + 1,
2149 "Adapter on_batch timed out on final attempt, dropping batch"
2150 );
2151 false
2152 }
2153 }
2154}
2155
2156struct BatchWorkerParams {
2157 shard_id: u16,
2158 rx: mpsc::Receiver<Vec<crate::event::InternalEvent>>,
2159 adapter: Arc<dyn Adapter>,
2160 shard_manager: Arc<ShardManager>,
2161 config: BatchConfig,
2162 adapter_timeout: Duration,
2163 batch_retries: u32,
2164 /// Bus-owned mirror of `BatchWorker::next_sequence`. The worker
2165 /// stores its post-flush sequence here on every dispatch so the
2166 /// bus can read it after the worker exits — see
2167 /// `ShardWorkers::next_sequence` for the consumer side.
2168 next_sequence: Arc<AtomicU64>,
2169 /// Bus-level stats. The worker increments
2170 /// `batches_dispatched` and `events_dispatched` after every
2171 /// successful `dispatch_batch`. Both must actually be
2172 /// incremented here, otherwise `flush()`'s Phase 2 progress
2173 /// probe would always observe zero progress and early-break
2174 /// after a single `max_delay` window — racing the
2175 /// BatchWorker's first `recv_timeout` and flaking on
2176 /// Windows-class timer resolution.
2177 stats: Arc<EventBusStats>,
2178 /// Producer nonce stamped on every batch the worker emits.
2179 /// Bus-loaded from the persistent path when
2180 /// `producer_nonce_path` is configured, otherwise from the
2181 /// per-process default.
2182 producer_nonce: u64,
2183}
2184
2185fn spawn_batch_worker(params: BatchWorkerParams) -> JoinHandle<()> {
2186 let BatchWorkerParams {
2187 shard_id,
2188 mut rx,
2189 adapter,
2190 shard_manager,
2191 config,
2192 adapter_timeout,
2193 batch_retries,
2194 next_sequence,
2195 stats,
2196 producer_nonce,
2197 } = params;
2198 tokio::spawn(async move {
2199 let mut worker = BatchWorker::new(shard_id, config.clone(), next_sequence, producer_nonce);
2200
2201 loop {
2202 // Wait for events with timeout. The batch worker exits
2203 // only when its channel is closed — i.e. after every
2204 // upstream sender (the drain worker for this shard +
2205 // `EventBus::batch_senders`) has been dropped.
2206 // `EventBus::shutdown` enforces that ordering so no
2207 // event is left stranded in the channel.
2208 let recv_timeout = worker.time_until_timeout().unwrap_or(config.max_delay);
2209
2210 match tokio::time::timeout(recv_timeout, rx.recv()).await {
2211 Ok(Some(events)) => {
2212 if let Some(batch) = worker.add_events(events) {
2213 let batch_len = batch.len() as u64;
2214 if dispatch_batch(
2215 &*adapter,
2216 batch,
2217 shard_id,
2218 adapter_timeout,
2219 batch_retries,
2220 )
2221 .await
2222 {
2223 stats
2224 .batches_dispatched
2225 .fetch_add(1, AtomicOrdering::Relaxed);
2226 stats
2227 .events_dispatched
2228 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2229 if let Some(shard_ref) = shard_manager.shard(shard_id) {
2230 shard_ref.lock().record_batch_dispatch();
2231 }
2232 }
2233 }
2234 }
2235 Ok(None) => {
2236 // Channel closed — drain any pending and exit.
2237 if worker.has_pending() {
2238 let batch = worker.flush();
2239 if !batch.is_empty() {
2240 let batch_len = batch.len() as u64;
2241 if dispatch_batch(
2242 &*adapter,
2243 batch,
2244 shard_id,
2245 adapter_timeout,
2246 batch_retries,
2247 )
2248 .await
2249 {
2250 stats
2251 .batches_dispatched
2252 .fetch_add(1, AtomicOrdering::Relaxed);
2253 stats
2254 .events_dispatched
2255 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2256 }
2257 }
2258 }
2259 break;
2260 }
2261 Err(_) => {
2262 // Timeout - check if we need to flush
2263 if let Some(batch) = worker.add_events(vec![]) {
2264 let batch_len = batch.len() as u64;
2265 if dispatch_batch(
2266 &*adapter,
2267 batch,
2268 shard_id,
2269 adapter_timeout,
2270 batch_retries,
2271 )
2272 .await
2273 {
2274 stats
2275 .batches_dispatched
2276 .fetch_add(1, AtomicOrdering::Relaxed);
2277 stats
2278 .events_dispatched
2279 .fetch_add(batch_len, AtomicOrdering::Relaxed);
2280 if let Some(shard_ref) = shard_manager.shard(shard_id) {
2281 shard_ref.lock().record_batch_dispatch();
2282 }
2283 }
2284 }
2285 }
2286 }
2287 }
2288 })
2289}
2290
2291/// Maximum time a drain worker waits for `drain_finalize_ready`
2292/// after observing `shutdown=true`. Defense in depth: if a caller
2293/// drops the bus mid-shutdown without setting the gate, we don't
2294/// want the worker pinned forever. The shutdown path *always* sets
2295/// the gate (even on its own timeout), so this deadline is normally
2296/// unreached.
2297const DRAIN_FINALIZE_TIMEOUT: Duration = Duration::from_secs(10);
2298
2299/// Spawn a drain worker for a single shard.
2300///
2301/// Uses a scratch `Vec` + `pop_batch_into` so the per-cycle
2302/// allocation happens *outside* the shard mutex critical section.
2303/// Each cycle: lock → drain into scratch (no alloc, capacity already
2304/// reserved) → unlock → `mem::replace` swaps the filled scratch out
2305/// for a fresh empty `Vec` (alloc *outside* the lock) → send the
2306/// filled batch on the channel.
2307fn spawn_drain_worker_for_shard(
2308 shard_id: u16,
2309 shard_manager: Arc<ShardManager>,
2310 sender: mpsc::Sender<Vec<crate::event::InternalEvent>>,
2311 shutdown: Arc<AtomicBool>,
2312 drain_finalize_ready: Arc<AtomicBool>,
2313) -> JoinHandle<()> {
2314 const STEADY_BATCH: usize = 1_000;
2315 const FINAL_BATCH: usize = 10_000;
2316
2317 tokio::spawn(async move {
2318 let mut scratch: Vec<crate::event::InternalEvent> = Vec::with_capacity(STEADY_BATCH);
2319
2320 loop {
2321 // SeqCst to match the writer side (`EventBus::shutdown` /
2322 // `Drop`). `try_enter_ingest` itself uses SeqCst, and
2323 // the Acquire/Release handshake on
2324 // `drain_finalize_ready` (below) is what actually makes
2325 // the producer-push happen-before visible. Aligning to
2326 // SeqCst here makes the contract robust to future
2327 // producer-side changes that might piggyback on
2328 // `shutdown`'s ordering.
2329 if shutdown.load(AtomicOrdering::SeqCst) {
2330 // Before doing the final sweep, wait for `shutdown()`
2331 // to release the finalize gate. The gate is set only
2332 // after the in-flight ingest counter reaches zero,
2333 // which means every producer that read `shutdown=false`
2334 // has completed its push. Without this wait, the drain
2335 // worker can race ahead of a late push under
2336 // shard-mutex serialization (drain takes the lock
2337 // first, sees nothing, exits; producer then takes the
2338 // lock and pushes — event stranded).
2339 //
2340 // Acquire pairs with the Release in `EventBus::shutdown`
2341 // and `EventBus::drop`, transitively making every
2342 // producer push that happened-before its `in_flight`
2343 // decrement visible to the subsequent `pop_batch_into`.
2344 // `tokio::time::Instant` so virtualized-clock tests
2345 // (`tokio::time::pause`) advance the deadline via
2346 // `sleep` rather than spinning until wall-clock catches
2347 // up.
2348 let finalize_deadline = tokio::time::Instant::now() + DRAIN_FINALIZE_TIMEOUT;
2349 while !drain_finalize_ready.load(AtomicOrdering::Acquire) {
2350 if tokio::time::Instant::now() >= finalize_deadline {
2351 tracing::warn!(
2352 shard_id,
2353 "drain worker timed out waiting for finalize gate; \
2354 proceeding with potential event loss"
2355 );
2356 break;
2357 }
2358 // Park instead of `yield_now` so we don't
2359 // starve the workers / producers we're waiting
2360 // on under contention.
2361 tokio::time::sleep(std::time::Duration::from_millis(1)).await;
2362 }
2363
2364 // Final drain: loop until the ring buffer is empty.
2365 // A single 10k batch is not enough — the ring
2366 // buffer can hold up to `ring_buffer_capacity`
2367 // events (default 1M) and any leftover would be
2368 // silently lost on shutdown.
2369 //
2370 // Pre-fix this broke at the first
2371 // `popped == 0`. The audit posited a narrow race
2372 // where a producer that fetch_add'd
2373 // in_flight_ingests but stalled before the
2374 // shard-lock body could push AFTER shutdown
2375 // observed in_flight=0 yet BEFORE this final
2376 // sweep saw the event. The SeqCst guard pattern
2377 // makes this unlikely (the push happens-before
2378 // the guard drop), but the defense is cheap:
2379 // require TWO consecutive zero-event passes
2380 // before declaring drain. The yield_now between
2381 // them gives a stalled producer a chance to land
2382 // the push.
2383 let mut final_scratch: Vec<crate::event::InternalEvent> =
2384 Vec::with_capacity(FINAL_BATCH);
2385 let mut consecutive_zeros = 0u32;
2386 loop {
2387 let popped = shard_manager
2388 .with_shard(shard_id, |shard| {
2389 shard.pop_batch_into(&mut final_scratch, FINAL_BATCH)
2390 })
2391 .unwrap_or(0);
2392 if popped == 0 {
2393 consecutive_zeros += 1;
2394 if consecutive_zeros >= 2 {
2395 break;
2396 }
2397 // Yield to let any racing producer commit
2398 // its push, then re-poll.
2399 tokio::task::yield_now().await;
2400 continue;
2401 }
2402 consecutive_zeros = 0;
2403 let batch =
2404 std::mem::replace(&mut final_scratch, Vec::with_capacity(FINAL_BATCH));
2405 let batch_len = batch.len();
2406 if let Err(_send_err) = sender.send(batch).await {
2407 // Batch worker exited before drain. The
2408 // `mem::replace` already pulled events out
2409 // of the ring buffer, so the dropped batch
2410 // is unrecoverable — the SendError carries
2411 // it back but the consumer is gone. Surface
2412 // the count loudly so the loss is
2413 // observable in operator dashboards rather
2414 // than a silent miss in shutdown stats.
2415 tracing::error!(
2416 shard_id,
2417 dropped = batch_len,
2418 "drain worker (final): batch worker dropped \
2419 channel before final drain completed; \
2420 events removed from ring buffer cannot be redelivered",
2421 );
2422 break;
2423 }
2424 }
2425 break;
2426 }
2427
2428 // Drain events from ring buffer.
2429 let popped = shard_manager.with_shard(shard_id, |shard| {
2430 shard.pop_batch_into(&mut scratch, STEADY_BATCH)
2431 });
2432
2433 match popped {
2434 Some(0) => {
2435 // No events — yield briefly. The 100μs sleep is deliberate:
2436 // this is a latency-first system where the drain loop is the
2437 // hot path. Longer backoff would add milliseconds of latency
2438 // to the first event after a quiet period, violating the
2439 // sub-microsecond design target. The CPU cost of 100μs polling
2440 // is acceptable for a system that processes 10M+ events/sec.
2441 tokio::time::sleep(Duration::from_micros(100)).await;
2442 }
2443 Some(_) => {
2444 let batch = std::mem::replace(&mut scratch, Vec::with_capacity(STEADY_BATCH));
2445 let batch_len = batch.len();
2446 if let Err(_send_err) = sender.send(batch).await {
2447 // Steady-state: the only way the batch
2448 // worker drops the channel is if it
2449 // panicked or `remove_shard_internal`
2450 // tore it down out of order with the
2451 // drain worker (which the documented
2452 // shutdown sequence forbids). Either way,
2453 // the events are unrecoverable — the
2454 // `mem::replace` above already pulled them
2455 // out of the ring buffer. Pre-fix this
2456 // simply `break`-d, leaving the loss
2457 // invisible. Surface a loud error with
2458 // the dropped count so an out-of-order
2459 // shutdown or batch-worker panic shows up
2460 // in dashboards rather than as a silent
2461 // metric gap.
2462 tracing::error!(
2463 shard_id,
2464 dropped = batch_len,
2465 "drain worker: batch worker dropped channel \
2466 during steady-state drain; events removed from \
2467 ring buffer cannot be redelivered",
2468 );
2469 break;
2470 }
2471 }
2472 None => {
2473 // Shard no longer exists (was removed)
2474 break;
2475 }
2476 }
2477 }
2478 })
2479}
2480
2481#[cfg(test)]
2482mod tests {
2483 use super::*;
2484 use crate::shard::ScalingPolicy;
2485 use serde_json::json;
2486
2487 #[tokio::test]
2488 async fn test_event_bus_basic() {
2489 let config = EventBusConfig::builder()
2490 .num_shards(2)
2491 .ring_buffer_capacity(1024)
2492 .build()
2493 .unwrap();
2494
2495 let bus = EventBus::new(config).await.unwrap();
2496
2497 // Ingest some events
2498 for i in 0..10 {
2499 let event = Event::new(json!({"index": i}));
2500 bus.ingest(event).unwrap();
2501 }
2502
2503 // Give workers time to process
2504 tokio::time::sleep(Duration::from_millis(100)).await;
2505
2506 // Check stats
2507 assert_eq!(
2508 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2509 10
2510 );
2511
2512 bus.shutdown().await.unwrap();
2513 }
2514
2515 #[tokio::test]
2516 async fn test_event_bus_batch_ingest() {
2517 let config = EventBusConfig::default();
2518 let bus = EventBus::new(config).await.unwrap();
2519
2520 let events: Vec<Event> = (0..100).map(|i| Event::new(json!({"i": i}))).collect();
2521
2522 let ingested = bus.ingest_batch(events);
2523 assert_eq!(ingested, 100);
2524
2525 bus.shutdown().await.unwrap();
2526 }
2527
2528 #[tokio::test]
2529 async fn test_event_bus_with_dynamic_scaling() {
2530 let policy = ScalingPolicy {
2531 min_shards: 2,
2532 max_shards: 8,
2533 ..Default::default()
2534 };
2535
2536 let config = EventBusConfig::builder()
2537 .num_shards(2)
2538 .ring_buffer_capacity(1024)
2539 .scaling(policy)
2540 .build()
2541 .unwrap();
2542
2543 let bus = EventBus::new(config).await.unwrap();
2544
2545 // Verify dynamic scaling is enabled
2546 assert!(bus.is_dynamic_scaling_enabled());
2547 assert_eq!(bus.num_shards(), 2);
2548
2549 // Ingest some events
2550 for i in 0..100 {
2551 let event = Event::new(json!({"index": i}));
2552 bus.ingest(event).unwrap();
2553 }
2554
2555 // Give workers time to process
2556 tokio::time::sleep(Duration::from_millis(100)).await;
2557
2558 // Check stats
2559 assert_eq!(
2560 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2561 100
2562 );
2563
2564 bus.shutdown().await.unwrap();
2565 }
2566
2567 #[tokio::test]
2568 async fn test_manual_scale_up() {
2569 let policy = ScalingPolicy {
2570 min_shards: 2,
2571 max_shards: 8,
2572 cooldown: Duration::from_nanos(1), // Effectively disable cooldown for test
2573 ..Default::default()
2574 };
2575
2576 let config = EventBusConfig::builder()
2577 .num_shards(2)
2578 .ring_buffer_capacity(1024)
2579 .scaling(policy)
2580 .build()
2581 .unwrap();
2582
2583 let bus = EventBus::new(config).await.unwrap();
2584
2585 assert_eq!(bus.num_shards(), 2);
2586
2587 // Manually scale up
2588 let new_ids = bus.manual_scale_up(2).await.unwrap();
2589 assert_eq!(new_ids.len(), 2);
2590 assert_eq!(bus.num_shards(), 4);
2591
2592 // Ingest events - they should be distributed across all shards
2593 for i in 0..100 {
2594 let event = Event::new(json!({"index": i}));
2595 bus.ingest(event).unwrap();
2596 }
2597
2598 tokio::time::sleep(Duration::from_millis(100)).await;
2599
2600 assert_eq!(
2601 bus.stats().events_ingested.load(AtomicOrdering::Relaxed),
2602 100
2603 );
2604
2605 bus.shutdown().await.unwrap();
2606 }
2607
2608 /// Regression for BUG_AUDIT_2026_04_30_CORE.md #82: previously
2609 /// `manual_scale_down` only called `mapper.scale_down(count)`,
2610 /// which marks shards as `Draining` but does NOT finalize them.
2611 /// Bus configs without an active scaling monitor (or callers
2612 /// shutting down before the monitor's next tick) lost any
2613 /// events queued in the drained shards' ring buffers because
2614 /// `remove_shard_internal` was never invoked. The fix runs the
2615 /// full lifecycle synchronously: scale_down → poll for empty →
2616 /// finalize_draining → remove_shard_internal.
2617 ///
2618 /// We pin this by scaling up, manually scaling down, and
2619 /// asserting that `num_shards` actually decreases — pre-fix
2620 /// the count would still reflect the Draining shards.
2621 #[tokio::test]
2622 async fn manual_scale_down_finalizes_and_removes_drained_shards() {
2623 let policy = ScalingPolicy {
2624 min_shards: 2,
2625 max_shards: 8,
2626 cooldown: Duration::from_nanos(1),
2627 ..Default::default()
2628 };
2629 let config = EventBusConfig::builder()
2630 .num_shards(2)
2631 .ring_buffer_capacity(1024)
2632 .scaling(policy)
2633 .build()
2634 .unwrap();
2635 let bus = EventBus::new(config).await.unwrap();
2636
2637 // Scale up to 4, then back down to 2.
2638 let added = bus.manual_scale_up(2).await.unwrap();
2639 assert_eq!(added.len(), 2);
2640 assert_eq!(bus.num_shards(), 4);
2641
2642 let removed = bus.manual_scale_down(2).await.unwrap();
2643 assert_eq!(
2644 removed.len(),
2645 2,
2646 "manual_scale_down must complete the lifecycle for both \
2647 requested shards (mark Draining → wait → finalize → remove)"
2648 );
2649
2650 // Pre-fix: `num_shards` would still be 4 because shards
2651 // were only marked Draining (and the routing-table removal
2652 // path never ran). Post-fix: it's back to 2.
2653 assert_eq!(
2654 bus.num_shards(),
2655 2,
2656 "drained shards must be removed from the routing table"
2657 );
2658
2659 bus.shutdown().await.unwrap();
2660 }
2661
2662 #[tokio::test]
2663 async fn test_shard_metrics() {
2664 let policy = ScalingPolicy::default();
2665
2666 let config = EventBusConfig::builder()
2667 .num_shards(2)
2668 .ring_buffer_capacity(1024)
2669 .scaling(policy)
2670 .build()
2671 .unwrap();
2672
2673 let bus = EventBus::new(config).await.unwrap();
2674
2675 // Ingest some events
2676 for i in 0..50 {
2677 let event = Event::new(json!({"index": i}));
2678 bus.ingest(event).unwrap();
2679 }
2680
2681 // Get metrics
2682 let metrics = bus.shard_metrics();
2683 assert!(metrics.is_some());
2684 let metrics = metrics.unwrap();
2685 assert_eq!(metrics.len(), 2);
2686
2687 bus.shutdown().await.unwrap();
2688 }
2689
2690 #[tokio::test]
2691 async fn test_regression_eventbus_drop_signals_shutdown() {
2692 // Regression: dropping an EventBus without calling shutdown() used to
2693 // leave background tasks running indefinitely. The Drop impl now sets
2694 // the shutdown flag so workers eventually exit.
2695 let result = tokio::time::timeout(Duration::from_secs(5), async {
2696 let config = EventBusConfig::builder()
2697 .num_shards(2)
2698 .ring_buffer_capacity(1024)
2699 .build()
2700 .unwrap();
2701
2702 let bus = EventBus::new(config).await.unwrap();
2703
2704 // Ingest some events
2705 for i in 0..10 {
2706 let event = Event::new(json!({"index": i}));
2707 bus.ingest(event).unwrap();
2708 }
2709
2710 // Drop without calling shutdown()
2711 drop(bus);
2712
2713 // If we reach here, the drop didn't hang
2714 })
2715 .await;
2716
2717 assert!(
2718 result.is_ok(),
2719 "EventBus drop should not hang — Drop impl must signal shutdown"
2720 );
2721 }
2722
2723 #[tokio::test]
2724 async fn test_with_dynamic_scaling_builder() {
2725 let config = EventBusConfig::builder()
2726 .num_shards(4)
2727 .ring_buffer_capacity(2048)
2728 .with_dynamic_scaling()
2729 .build()
2730 .unwrap();
2731
2732 let bus = EventBus::new(config).await.unwrap();
2733
2734 assert!(bus.is_dynamic_scaling_enabled());
2735 assert_eq!(bus.num_shards(), 4);
2736
2737 bus.shutdown().await.unwrap();
2738 }
2739
2740 /// Mock adapter that counts `on_batch` invocations and returns a
2741 /// configurable error variant. Used to assert dispatch retry
2742 /// semantics without dragging in a real adapter.
2743 struct CountingErrAdapter {
2744 calls: Arc<std::sync::atomic::AtomicU32>,
2745 make_err: Box<dyn Fn() -> AdapterError + Send + Sync>,
2746 }
2747
2748 #[async_trait::async_trait]
2749 impl crate::adapter::Adapter for CountingErrAdapter {
2750 async fn init(&mut self) -> Result<(), AdapterError> {
2751 Ok(())
2752 }
2753 async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2754 self.calls.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
2755 Err((self.make_err)())
2756 }
2757 async fn flush(&self) -> Result<(), AdapterError> {
2758 Ok(())
2759 }
2760 async fn shutdown(&self) -> Result<(), AdapterError> {
2761 Ok(())
2762 }
2763 async fn poll_shard(
2764 &self,
2765 _shard_id: u16,
2766 _from_id: Option<&str>,
2767 _limit: usize,
2768 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2769 Ok(crate::adapter::ShardPollResult::empty())
2770 }
2771 fn name(&self) -> &'static str {
2772 "counting_err"
2773 }
2774 async fn is_healthy(&self) -> bool {
2775 true
2776 }
2777 }
2778
2779 /// Regression: BUG_REPORT.md #21 — `dispatch_batch` previously
2780 /// retried every error variant, ignoring `AdapterError::is_retryable`.
2781 /// A non-retryable error (Connection / Fatal / Serialization)
2782 /// should now drop the batch immediately rather than burn the
2783 /// retry budget on something that cannot succeed.
2784 #[tokio::test(start_paused = true)]
2785 async fn dispatch_batch_skips_retries_on_non_retryable_error() {
2786 let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2787 let adapter = CountingErrAdapter {
2788 calls: calls.clone(),
2789 make_err: Box::new(|| AdapterError::Connection("refused".into())),
2790 };
2791
2792 let batch = Batch::new(0, vec![], 0);
2793 let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 5).await;
2794
2795 assert!(!ok, "non-retryable error must drop batch");
2796 assert_eq!(
2797 calls.load(std::sync::atomic::Ordering::SeqCst),
2798 1,
2799 "Connection error must not be retried; expected exactly 1 on_batch call"
2800 );
2801 }
2802
2803 /// Sanity: a retryable error *does* go through the full retry
2804 /// budget. Without this companion check, the previous test could
2805 /// pass for the wrong reason (e.g. if dispatch always returned on
2806 /// the first error).
2807 #[tokio::test(start_paused = true)]
2808 async fn dispatch_batch_retries_transient_errors() {
2809 let calls = Arc::new(std::sync::atomic::AtomicU32::new(0));
2810 let adapter = CountingErrAdapter {
2811 calls: calls.clone(),
2812 make_err: Box::new(|| AdapterError::Transient("temp".into())),
2813 };
2814
2815 let batch = Batch::new(0, vec![], 0);
2816 let ok = dispatch_batch(&adapter, batch, 0, Duration::from_secs(1), 3).await;
2817
2818 assert!(!ok);
2819 // 3 retries + 1 final attempt = 4 total calls.
2820 assert_eq!(calls.load(std::sync::atomic::Ordering::SeqCst), 4);
2821 }
2822
2823 /// Counting adapter that records the number of events delivered via
2824 /// `on_batch`. Used by shutdown-durability tests below.
2825 struct CountingAdapter {
2826 received: Arc<AtomicU64>,
2827 }
2828
2829 #[async_trait::async_trait]
2830 impl crate::adapter::Adapter for CountingAdapter {
2831 async fn init(&mut self) -> Result<(), AdapterError> {
2832 Ok(())
2833 }
2834 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
2835 self.received
2836 .fetch_add(batch.events.len() as u64, AtomicOrdering::SeqCst);
2837 Ok(())
2838 }
2839 async fn flush(&self) -> Result<(), AdapterError> {
2840 Ok(())
2841 }
2842 async fn shutdown(&self) -> Result<(), AdapterError> {
2843 Ok(())
2844 }
2845 async fn poll_shard(
2846 &self,
2847 _shard_id: u16,
2848 _from_id: Option<&str>,
2849 _limit: usize,
2850 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2851 Ok(crate::adapter::ShardPollResult::empty())
2852 }
2853 fn name(&self) -> &'static str {
2854 "counting"
2855 }
2856 async fn is_healthy(&self) -> bool {
2857 true
2858 }
2859 }
2860
2861 /// `retry_backoff` exponentially grows the base delay
2862 /// per attempt and adds per-(shard, attempt) jitter to
2863 /// decorrelate retries across shards. Pin both invariants:
2864 /// monotonic growth on the base, and jitter that produces
2865 /// different outputs for different shard ids.
2866 #[test]
2867 fn retry_backoff_grows_with_attempt_and_jitters_per_shard() {
2868 // Shard 0 attempt 0..6: base ms 100, 200, 400, 800, 1600,
2869 // 3200, 3200 (cap). Plus ±25% jitter.
2870 let s0_a0 = retry_backoff(0, 0).as_millis();
2871 let s0_a1 = retry_backoff(0, 1).as_millis();
2872 let s0_a4 = retry_backoff(0, 4).as_millis();
2873 let s0_a5 = retry_backoff(0, 5).as_millis();
2874 let s0_a6 = retry_backoff(0, 6).as_millis();
2875
2876 // Bounds: each attempt's base is in `[base*0.75, base*1.25)`.
2877 assert!((75..=125).contains(&s0_a0));
2878 assert!((150..=250).contains(&s0_a1));
2879 assert!((1200..=2000).contains(&s0_a4));
2880 assert!((2400..=4000).contains(&s0_a5));
2881 // Cap at attempt=5: attempt 6 must NOT exceed attempt 5's
2882 // upper bound.
2883 assert!(
2884 s0_a6 <= 4000,
2885 "attempt > 5 must cap at the attempt-5 base; got {}ms",
2886 s0_a6
2887 );
2888
2889 // Jitter property: different shards at the same
2890 // attempt land on different backoffs. Sample 16 distinct
2891 // shard ids and assert at least 4 unique backoff values.
2892 //
2893 // The bound is deliberately loose (4 / 16) because
2894 // `DefaultHasher`'s exact distribution is **not stable**
2895 // across Rust toolchain versions — a tighter check (e.g.
2896 // ≥ 8) would empirically pass on every toolchain we test
2897 // against today, but a future stdlib change to the hasher
2898 // could shift the distribution and flake CI for a property
2899 // (decorrelation across shards) that doesn't actually
2900 // depend on a high collision-resistance bar. Asserting
2901 // ≥ 4 unique values out of 16 is enough to catch a real
2902 // regression (e.g. accidentally hashing only `attempt`
2903 // and not `shard_id` would collapse all 16 to a single
2904 // value) while staying robust to hasher-distribution
2905 // drift.
2906 use std::collections::HashSet;
2907 let s_attempt2: HashSet<u128> = (0u16..16)
2908 .map(|s| retry_backoff(s, 2).as_millis())
2909 .collect();
2910 assert!(
2911 s_attempt2.len() >= 4,
2912 "jitter must decorrelate retries across shards; \
2913 only {} unique backoffs across 16 shards",
2914 s_attempt2.len()
2915 );
2916 }
2917
2918 /// CR-23: pin that `EventBus::shutdown` actually invokes the
2919 /// adapter's `flush()` and `shutdown()` methods. The existing
2920 /// `sdk/tests/shutdown_regression.rs` covers the
2921 /// "shutdown runs even with outstanding Arc clones" property
2922 /// using a memory adapter whose `flush`/`shutdown` are no-ops
2923 /// — so a regression that elided the adapter calls would still
2924 /// pass. This test uses a recording adapter that increments
2925 /// per-method counters; we assert flush AND shutdown both fired
2926 /// exactly once after a clean `bus.shutdown().await`.
2927 ///
2928 /// The fix routes `Net::shutdown` through
2929 /// `shutdown_via_ref`, which in turn calls
2930 /// `self.adapter.flush()` and `self.adapter.shutdown()` once
2931 /// each. CR-23 pins this contract at the bus layer so an
2932 /// inadvertent regression at the SDK or adapter wrapper layer
2933 /// can be caught without an integration setup.
2934 #[tokio::test]
2935 async fn cr23_shutdown_invokes_adapter_flush_and_shutdown_exactly_once() {
2936 struct RecordingAdapter {
2937 on_batch_calls: Arc<AtomicU64>,
2938 flush_calls: Arc<AtomicU64>,
2939 shutdown_calls: Arc<AtomicU64>,
2940 }
2941
2942 #[async_trait::async_trait]
2943 impl crate::adapter::Adapter for RecordingAdapter {
2944 async fn init(&mut self) -> Result<(), AdapterError> {
2945 Ok(())
2946 }
2947 async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
2948 self.on_batch_calls.fetch_add(1, AtomicOrdering::SeqCst);
2949 Ok(())
2950 }
2951 async fn flush(&self) -> Result<(), AdapterError> {
2952 self.flush_calls.fetch_add(1, AtomicOrdering::SeqCst);
2953 Ok(())
2954 }
2955 async fn shutdown(&self) -> Result<(), AdapterError> {
2956 self.shutdown_calls.fetch_add(1, AtomicOrdering::SeqCst);
2957 Ok(())
2958 }
2959 async fn poll_shard(
2960 &self,
2961 _shard_id: u16,
2962 _from_id: Option<&str>,
2963 _limit: usize,
2964 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
2965 Ok(crate::adapter::ShardPollResult::empty())
2966 }
2967 fn name(&self) -> &'static str {
2968 "cr23-recording"
2969 }
2970 async fn is_healthy(&self) -> bool {
2971 true
2972 }
2973 }
2974
2975 let on_batch = Arc::new(AtomicU64::new(0));
2976 let flush = Arc::new(AtomicU64::new(0));
2977 let shutdown = Arc::new(AtomicU64::new(0));
2978 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(RecordingAdapter {
2979 on_batch_calls: on_batch.clone(),
2980 flush_calls: flush.clone(),
2981 shutdown_calls: shutdown.clone(),
2982 });
2983
2984 let config = EventBusConfig::builder()
2985 .num_shards(2)
2986 .ring_buffer_capacity(1024)
2987 .build()
2988 .unwrap();
2989 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
2990
2991 // Drive a small burst so on_batch fires at least once —
2992 // pins that the adapter is wired up correctly. The
2993 // load-bearing assertions below are on flush and shutdown.
2994 for i in 0..16 {
2995 let _ = bus.ingest(Event::new(json!({"i": i})));
2996 }
2997
2998 // Pre-CR-23 a regression that elided one of these would
2999 // pass `shutdown_regression.rs::shutdown_runs_even_with_outstanding_event_stream`
3000 // because the memory adapter's flush/shutdown are no-ops.
3001 // Here the recording adapter makes the contract observable.
3002 bus.shutdown().await.unwrap();
3003
3004 assert!(
3005 on_batch.load(AtomicOrdering::SeqCst) > 0,
3006 "sanity: on_batch must have fired at least once"
3007 );
3008 assert_eq!(
3009 flush.load(AtomicOrdering::SeqCst),
3010 1,
3011 "CR-23 regression: shutdown MUST call adapter.flush() exactly once"
3012 );
3013 assert_eq!(
3014 shutdown.load(AtomicOrdering::SeqCst),
3015 1,
3016 "CR-23 regression: shutdown MUST call adapter.shutdown() exactly once"
3017 );
3018 }
3019
3020 /// CR-25: pin that a SECOND caller of `shutdown_via_ref` whose
3021 /// CAS loses (because a first caller already flipped the
3022 /// `shutdown` flag) and whose deadline elapses BEFORE the
3023 /// first caller sets `shutdown_completed=true` returns
3024 /// `AdapterError::Transient(_)` — NOT a silent `Ok(())`.
3025 ///
3026 /// Pre-CR-25 both branches returned `Ok`. A caller that lost
3027 /// the CAS race had no way to distinguish "first caller
3028 /// finished shutdown" from "deadline timed out mid-shutdown."
3029 /// Under a slow adapter (`adapter_timeout` default 30s >
3030 /// the 10s spin deadline), the second caller silently saw
3031 /// `Ok` while the bus was still mid-shutdown — letting
3032 /// subsequent code observe a partially-shut-down bus.
3033 ///
3034 /// We use a slow first caller (sleeps inside `flush()`)
3035 /// and override the spin deadline to a few ms so the test
3036 /// runs fast.
3037 #[tokio::test]
3038 async fn cr25_second_caller_returns_transient_when_deadline_elapses() {
3039 struct SlowFlushAdapter {
3040 // Block flush() for this long. The first caller
3041 // gets stuck here while the second caller's spin
3042 // deadline elapses.
3043 flush_delay: std::time::Duration,
3044 }
3045
3046 #[async_trait::async_trait]
3047 impl crate::adapter::Adapter for SlowFlushAdapter {
3048 async fn init(&mut self) -> Result<(), AdapterError> {
3049 Ok(())
3050 }
3051 async fn on_batch(&self, _batch: Batch) -> Result<(), AdapterError> {
3052 Ok(())
3053 }
3054 async fn flush(&self) -> Result<(), AdapterError> {
3055 tokio::time::sleep(self.flush_delay).await;
3056 Ok(())
3057 }
3058 async fn shutdown(&self) -> Result<(), AdapterError> {
3059 Ok(())
3060 }
3061 async fn poll_shard(
3062 &self,
3063 _shard_id: u16,
3064 _from_id: Option<&str>,
3065 _limit: usize,
3066 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3067 Ok(crate::adapter::ShardPollResult::empty())
3068 }
3069 fn name(&self) -> &'static str {
3070 "cr25-slow-flush"
3071 }
3072 async fn is_healthy(&self) -> bool {
3073 true
3074 }
3075 }
3076
3077 // Cubic P2: serialize access to the global deadline
3078 // override so concurrent tests don't interfere. Hold the
3079 // guard until the override is reset to 0 below.
3080 let _override_guard = super::shutdown_deadline_override_lock().await;
3081
3082 // Override the second-caller spin deadline to a short
3083 // value so the test doesn't wall-clock-wait 10s. Production
3084 // builds use the 10s default (the cfg(test) override is
3085 // compiled out).
3086 super::set_shutdown_via_ref_spin_deadline_for_test(50);
3087
3088 let config = EventBusConfig::builder()
3089 .num_shards(2)
3090 .ring_buffer_capacity(1024)
3091 .adapter_timeout(std::time::Duration::from_secs(5))
3092 .build()
3093 .unwrap();
3094 // First caller's flush() sleeps 500ms — far longer than
3095 // the 50ms spin deadline.
3096 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(SlowFlushAdapter {
3097 flush_delay: std::time::Duration::from_millis(500),
3098 });
3099 let bus = Arc::new(EventBus::new_with_adapter(config, adapter).await.unwrap());
3100
3101 // Spawn the FIRST caller — it wins the CAS and proceeds
3102 // into the slow flush. We don't await it; we want it
3103 // running in parallel.
3104 let bus_first = Arc::clone(&bus);
3105 let first_caller = tokio::spawn(async move { bus_first.shutdown_via_ref().await });
3106
3107 // Cubic P2: poll `is_shutdown()` until the first caller
3108 // has set the flag, instead of a fixed sleep. This makes
3109 // the test scheduler-independent — we proceed as soon as
3110 // the first caller has won the CAS, regardless of how
3111 // tokio happens to schedule things. Bounded by a 1s
3112 // timeout so a regression that prevents `shutdown_via_ref`
3113 // from setting the flag doesn't hang the test.
3114 tokio::time::timeout(std::time::Duration::from_secs(1), async {
3115 while !bus.is_shutdown() {
3116 tokio::task::yield_now().await;
3117 }
3118 })
3119 .await
3120 .expect("first caller did not set shutdown flag within 1s");
3121
3122 // The second caller's CAS will fail; it enters the spin
3123 // and times out at 50ms.
3124 let start = std::time::Instant::now();
3125 let second_result = bus.shutdown_via_ref().await;
3126 let elapsed = start.elapsed();
3127
3128 // Reset the override for other tests, then drop the guard.
3129 super::set_shutdown_via_ref_spin_deadline_for_test(0);
3130
3131 // CR-25 contract: the second caller MUST get a Transient
3132 // error, not a silent Ok.
3133 let err = second_result.expect_err(
3134 "CR-25 regression: second caller MUST surface AdapterError::Transient \
3135 when its deadline elapses, NOT a silent Ok",
3136 );
3137 match err {
3138 AdapterError::Transient(msg) => {
3139 assert!(
3140 msg.contains("deadline elapsed") || msg.contains("mid-shutdown"),
3141 "error message must reference the deadline path; got: {msg}"
3142 );
3143 }
3144 other => panic!("expected Transient, got {:?}", other),
3145 }
3146
3147 // Sanity: the second caller's elapsed time is bounded by
3148 // the override (50ms) + scheduler slop. If this is
3149 // anywhere near the production 10s, the cfg(test)
3150 // override path broke.
3151 assert!(
3152 elapsed < std::time::Duration::from_secs(5),
3153 "second caller took {elapsed:?}; the cfg(test) deadline override \
3154 broke if this is near the 10s production default"
3155 );
3156
3157 // Wait for the first caller to finish. Even though it
3158 // took the slow path, the bus IS shutting down and will
3159 // eventually complete.
3160 let _ = first_caller.await.unwrap();
3161 assert!(bus.is_shutdown_completed());
3162 }
3163
3164 /// Regression: BUG_REPORT.md #6 — `shutdown()` must deliver every
3165 /// successfully-ingested event to the adapter before returning.
3166 /// Pins the broader durability contract that the
3167 /// `drain_finalize_ready` gate supports: the drain worker may not
3168 /// finalize until the in-flight wait completes.
3169 ///
3170 /// Tests across many shards with bursts large enough that the
3171 /// drain workers are mid-loop when shutdown begins.
3172 #[tokio::test]
3173 async fn shutdown_delivers_every_successful_ingest_to_adapter() {
3174 let received = Arc::new(AtomicU64::new(0));
3175 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3176 received: received.clone(),
3177 });
3178
3179 let config = EventBusConfig::builder()
3180 .num_shards(4)
3181 .ring_buffer_capacity(4096)
3182 .build()
3183 .unwrap();
3184 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3185
3186 // Drive a sizable burst across all shards. Capacity > burst so
3187 // we don't trip backpressure; every successful Ok must reach
3188 // `on_batch` before shutdown returns.
3189 let total = 10_000usize;
3190 let mut successes = 0u64;
3191 for i in 0..total {
3192 if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3193 successes += 1;
3194 }
3195 }
3196
3197 // Shutdown awaits drain workers; with the BUG_REPORT.md #6 fix
3198 // those workers wait on `drain_finalize_ready` after observing
3199 // `shutdown=true`, so any push the producer made before the
3200 // shutdown flag is guaranteed to be in the ring buffer when
3201 // the final sweep runs.
3202 bus.shutdown().await.unwrap();
3203
3204 let delivered = received.load(AtomicOrdering::SeqCst);
3205 assert_eq!(
3206 delivered, successes,
3207 "shutdown stranded events: {successes} ingested successfully, \
3208 only {delivered} reached the adapter"
3209 );
3210 }
3211
3212 /// Regression: BUG_REPORT.md #16 — `flush()` must be a delivery
3213 /// barrier: after it returns successfully, every event the
3214 /// caller successfully ingested before `flush()` was called
3215 /// must have been handed to the adapter via `on_batch`.
3216 /// The previous implementation slept a single `batch.max_delay`
3217 /// after the ring buffers drained, which left a window where
3218 /// events could still be sitting in the per-shard mpsc channel
3219 /// or inside a partially-filled batch awaiting timeout — those
3220 /// events were silently dropped from the flush guarantee.
3221 #[tokio::test]
3222 async fn flush_is_a_delivery_barrier() {
3223 let received = Arc::new(AtomicU64::new(0));
3224 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3225 received: received.clone(),
3226 });
3227
3228 // Use a deliberately *long* batch.max_delay (250ms) so that a
3229 // partially-filled batch sitting in the batch worker's
3230 // pending state would survive past the old single-`max_delay`
3231 // sleep. min_size > burst forces the partial-batch path.
3232 let config = EventBusConfig::builder()
3233 .num_shards(2)
3234 .ring_buffer_capacity(1024)
3235 .batch(crate::config::BatchConfig {
3236 min_size: 1_000,
3237 max_size: 10_000,
3238 max_delay: Duration::from_millis(250),
3239 adaptive: false,
3240 velocity_window: Duration::from_millis(100),
3241 })
3242 .build()
3243 .unwrap();
3244 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3245
3246 // A small burst — far below `min_size`, so the batch worker
3247 // will sit on a partial batch waiting for `max_delay`.
3248 let burst = 50usize;
3249 let mut successes = 0u64;
3250 for i in 0..burst {
3251 if bus.ingest(Event::new(json!({"i": i}))).is_ok() {
3252 successes += 1;
3253 }
3254 }
3255
3256 // Time the flush call to confirm we waited long enough for
3257 // the partial batch to time out. The previous code slept
3258 // ~10ms total in the post-empty phase; the fix waits up to
3259 // `max_delay * num_workers` (here 500ms cap, capped at 2s).
3260 let t0 = std::time::Instant::now();
3261 bus.flush().await.unwrap();
3262 let elapsed = t0.elapsed();
3263
3264 // After flush returns, every successful ingest must have
3265 // been delivered to the adapter. With the old code this
3266 // assertion would fail: events sit in the partial batch
3267 // until `max_delay` (250ms) elapses, but flush returned
3268 // after only ~10ms.
3269 let delivered = received.load(AtomicOrdering::SeqCst);
3270 assert_eq!(
3271 delivered, successes,
3272 "flush() returned but only {delivered} of {successes} \
3273 events reached the adapter (#16); flush waited {:?}",
3274 elapsed
3275 );
3276
3277 bus.shutdown().await.unwrap();
3278 }
3279
3280 /// Regression (Phase 1): when configured with a
3281 /// persistent `producer_nonce_path`, two bus instances launched
3282 /// against the same path stamp the SAME nonce on every emitted
3283 /// batch. JetStream / Redis adapters key dedup on this nonce, so
3284 /// a producer that crashed mid-batch and restarted (within the
3285 /// backend's dedup window) issues retries with the same msg-ids
3286 /// and the backend correctly recognizes them as duplicates.
3287 ///
3288 /// Pre-fix the per-process nonce regenerated on every startup,
3289 /// so post-crash retries wrote NEW msg-ids and the backend
3290 /// persisted the partial-batch's accepted half twice.
3291 #[tokio::test]
3292 async fn persistent_producer_nonce_survives_bus_restart() {
3293 // Use a per-test temp file so concurrent runs don't collide.
3294 let mut nonce_path = std::env::temp_dir();
3295 let pid = std::process::id();
3296 let nanos = std::time::SystemTime::now()
3297 .duration_since(std::time::UNIX_EPOCH)
3298 .map(|d| d.as_nanos())
3299 .unwrap_or(0);
3300 nonce_path.push(format!("net-test-bus-nonce-{pid}-{nanos}"));
3301
3302 let make_config = |path: &std::path::Path| {
3303 EventBusConfig::builder()
3304 .num_shards(1)
3305 .ring_buffer_capacity(1024)
3306 .producer_nonce_path(path)
3307 .build()
3308 .unwrap()
3309 };
3310
3311 // First bus: ingest one event. Read its nonce off the
3312 // adapter-bound batch via a recording adapter.
3313 struct NonceRecordingAdapter {
3314 nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3315 }
3316 #[async_trait::async_trait]
3317 impl crate::adapter::Adapter for NonceRecordingAdapter {
3318 async fn init(&mut self) -> Result<(), AdapterError> {
3319 Ok(())
3320 }
3321 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3322 *self.nonce.lock() = Some(batch.process_nonce);
3323 Ok(())
3324 }
3325 async fn flush(&self) -> Result<(), AdapterError> {
3326 Ok(())
3327 }
3328 async fn shutdown(&self) -> Result<(), AdapterError> {
3329 Ok(())
3330 }
3331 async fn poll_shard(
3332 &self,
3333 _: u16,
3334 _: Option<&str>,
3335 _: usize,
3336 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3337 Ok(crate::adapter::ShardPollResult::empty())
3338 }
3339 fn name(&self) -> &'static str {
3340 "nonce-recording"
3341 }
3342 }
3343
3344 let nonce_first_run = Arc::new(parking_lot::Mutex::new(None));
3345 {
3346 let bus = EventBus::new_with_adapter(
3347 make_config(&nonce_path),
3348 Box::new(NonceRecordingAdapter {
3349 nonce: nonce_first_run.clone(),
3350 }),
3351 )
3352 .await
3353 .unwrap();
3354 bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3355 bus.flush().await.unwrap();
3356 bus.shutdown().await.unwrap();
3357 }
3358
3359 let nonce_second_run = Arc::new(parking_lot::Mutex::new(None));
3360 {
3361 let bus = EventBus::new_with_adapter(
3362 make_config(&nonce_path),
3363 Box::new(NonceRecordingAdapter {
3364 nonce: nonce_second_run.clone(),
3365 }),
3366 )
3367 .await
3368 .unwrap();
3369 bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3370 bus.flush().await.unwrap();
3371 bus.shutdown().await.unwrap();
3372 }
3373
3374 let n_a = nonce_first_run
3375 .lock()
3376 .expect("first bus must have dispatched a batch");
3377 let n_b = nonce_second_run
3378 .lock()
3379 .expect("second bus must have dispatched a batch");
3380 assert_eq!(
3381 n_a, n_b,
3382 "two bus instances against the same producer_nonce_path \
3383 must stamp the same nonce — pre-fix this regenerated on \
3384 every restart and JetStream's dedup window saw new \
3385 msg-ids as fresh batches",
3386 );
3387
3388 // Cleanup.
3389 let _ = std::fs::remove_file(&nonce_path);
3390 }
3391
3392 /// Pin that ALL spawn sites — both the static initial-shard
3393 /// loop in `new_with_adapter` and the dynamic-add path in
3394 /// `add_shard_internal` — clone the bus's loaded
3395 /// `producer_nonce` correctly. Pre-#56 there was no nonce
3396 /// concept at the bus layer; if any future refactor drops the
3397 /// `producer_nonce: self.producer_nonce` line from one of the
3398 /// spawn sites (or stops loading the persistent path), the
3399 /// post-scale-up shard's batches would carry a different nonce
3400 /// and JetStream's cross-restart dedup would silently break for
3401 /// events ingested into the dynamic shard. Pin all observed
3402 /// batches across the static + dynamic shards share the bus's
3403 /// nonce.
3404 #[tokio::test]
3405 async fn multi_shard_bus_stamps_consistent_nonce_across_static_and_dynamic_shards() {
3406 let mut nonce_path = std::env::temp_dir();
3407 let pid = std::process::id();
3408 let nanos = std::time::SystemTime::now()
3409 .duration_since(std::time::UNIX_EPOCH)
3410 .map(|d| d.as_nanos())
3411 .unwrap_or(0);
3412 nonce_path.push(format!("net-test-multi-shard-nonce-{pid}-{nanos}"));
3413
3414 struct CollectingAdapter {
3415 nonces: Arc<parking_lot::Mutex<Vec<u64>>>,
3416 }
3417 #[async_trait::async_trait]
3418 impl crate::adapter::Adapter for CollectingAdapter {
3419 async fn init(&mut self) -> Result<(), AdapterError> {
3420 Ok(())
3421 }
3422 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3423 self.nonces.lock().push(batch.process_nonce);
3424 Ok(())
3425 }
3426 async fn flush(&self) -> Result<(), AdapterError> {
3427 Ok(())
3428 }
3429 async fn shutdown(&self) -> Result<(), AdapterError> {
3430 Ok(())
3431 }
3432 async fn poll_shard(
3433 &self,
3434 _: u16,
3435 _: Option<&str>,
3436 _: usize,
3437 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3438 Ok(crate::adapter::ShardPollResult::empty())
3439 }
3440 fn name(&self) -> &'static str {
3441 "collecting"
3442 }
3443 }
3444
3445 let nonces = Arc::new(parking_lot::Mutex::new(Vec::new()));
3446 let policy = ScalingPolicy {
3447 min_shards: 1,
3448 max_shards: 8,
3449 cooldown: Duration::from_nanos(1),
3450 ..Default::default()
3451 };
3452 let config = EventBusConfig::builder()
3453 .num_shards(2)
3454 .ring_buffer_capacity(1024)
3455 .scaling(policy)
3456 .producer_nonce_path(&nonce_path)
3457 .build()
3458 .unwrap();
3459
3460 let bus = EventBus::new_with_adapter(
3461 config,
3462 Box::new(CollectingAdapter {
3463 nonces: nonces.clone(),
3464 }),
3465 )
3466 .await
3467 .unwrap();
3468
3469 // Drive the two static shards.
3470 for i in 0..200u64 {
3471 let _ = bus.ingest(Event::new(json!({"i": i})));
3472 }
3473 bus.flush().await.unwrap();
3474
3475 // Add a dynamic shard and drive it too.
3476 let _ = bus.manual_scale_up(1).await.unwrap();
3477 for i in 200..400u64 {
3478 let _ = bus.ingest(Event::new(json!({"i": i})));
3479 }
3480 bus.flush().await.unwrap();
3481
3482 bus.shutdown().await.unwrap();
3483
3484 let observed = nonces.lock().clone();
3485 assert!(
3486 !observed.is_empty(),
3487 "expected the adapter to have observed at least one batch",
3488 );
3489 let first = observed[0];
3490 for (i, &n) in observed.iter().enumerate() {
3491 assert_eq!(
3492 n, first,
3493 "batch {i} stamped a different nonce ({n:#x}) than the first \
3494 batch ({first:#x}) — at least one spawn site (initial-shard \
3495 loop or `add_shard_internal`) failed to inherit the bus's \
3496 producer_nonce",
3497 );
3498 }
3499
3500 let _ = std::fs::remove_file(&nonce_path);
3501 }
3502
3503 /// Regression guard for the `(process_nonce, shard_id,
3504 /// sequence_start, i)` JetStream / Redis msg-id construction.
3505 /// Within one bus instance, `sequence_start` per shard MUST be
3506 /// strictly monotonic across batches AND every two adjacent
3507 /// batches `n` and `n+1` from the same shard MUST satisfy
3508 /// `seq_start[n+1] == seq_start[n] + len(events[n])` — no gaps,
3509 /// no overlap. A regression here breaks the at-most-once dedup
3510 /// every persistent adapter relies on: gaps reuse a `(shard,
3511 /// seq, i)` tuple after the dedup window closes, and overlap
3512 /// silently overlays a later batch on an earlier one's slot.
3513 ///
3514 /// The cross-restart variant (persistent `next_sequence` across
3515 /// process boots) is feature-shaped, not a bug fix — without
3516 /// persistence, restart relies on `process_nonce` rotating to
3517 /// disjoin the msg-id namespace (pinned by
3518 /// `persistent_producer_nonce_survives_bus_restart`). This test
3519 /// pins the within-process invariant the persistent nonce
3520 /// builds on.
3521 #[tokio::test]
3522 async fn sequence_start_is_per_shard_monotonic_and_gap_free() {
3523 struct ShardSeqRecorder {
3524 batches: Arc<parking_lot::Mutex<Vec<(u16, u64, usize)>>>,
3525 }
3526 #[async_trait::async_trait]
3527 impl crate::adapter::Adapter for ShardSeqRecorder {
3528 async fn init(&mut self) -> Result<(), AdapterError> {
3529 Ok(())
3530 }
3531 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3532 self.batches.lock().push((
3533 batch.shard_id,
3534 batch.sequence_start,
3535 batch.events.len(),
3536 ));
3537 Ok(())
3538 }
3539 async fn flush(&self) -> Result<(), AdapterError> {
3540 Ok(())
3541 }
3542 async fn shutdown(&self) -> Result<(), AdapterError> {
3543 Ok(())
3544 }
3545 async fn poll_shard(
3546 &self,
3547 _: u16,
3548 _: Option<&str>,
3549 _: usize,
3550 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3551 Ok(crate::adapter::ShardPollResult::empty())
3552 }
3553 fn name(&self) -> &'static str {
3554 "shard-seq-recorder"
3555 }
3556 }
3557
3558 let batches = Arc::new(parking_lot::Mutex::new(Vec::new()));
3559 let config = EventBusConfig::builder()
3560 .num_shards(4)
3561 .ring_buffer_capacity(1024)
3562 .build()
3563 .unwrap();
3564 let bus = EventBus::new_with_adapter(
3565 config,
3566 Box::new(ShardSeqRecorder {
3567 batches: batches.clone(),
3568 }),
3569 )
3570 .await
3571 .unwrap();
3572
3573 // Three drive-then-flush rounds so each shard sees multiple
3574 // batches; the across-batch monotonicity invariant is what
3575 // we're pinning, so a single batch per shard wouldn't
3576 // exercise it.
3577 for round in 0..3u64 {
3578 for i in 0..200u64 {
3579 bus.ingest(Event::new(json!({"r": round, "i": i}))).unwrap();
3580 }
3581 bus.flush().await.unwrap();
3582 }
3583 bus.shutdown().await.unwrap();
3584
3585 let observed = batches.lock().clone();
3586 assert!(
3587 !observed.is_empty(),
3588 "expected the adapter to have observed at least one batch",
3589 );
3590
3591 // Bucket batches per shard, preserving dispatch order.
3592 let mut by_shard: std::collections::HashMap<u16, Vec<(u64, usize)>> =
3593 std::collections::HashMap::new();
3594 for (shard, seq_start, len) in observed {
3595 by_shard.entry(shard).or_default().push((seq_start, len));
3596 }
3597
3598 for (shard, runs) in &by_shard {
3599 assert!(
3600 !runs.is_empty(),
3601 "shard {shard}: must have at least one batch",
3602 );
3603 // First batch of every shard starts at the per-shard
3604 // zero — `BatchWorker::next_sequence` is initialized to
3605 // 0 and the first `flush()` reads it before the
3606 // `saturating_add`. A regression that lazily seeds
3607 // `next_sequence` from a non-zero source (e.g. wall
3608 // clock) would trip here.
3609 assert_eq!(
3610 runs[0].0, 0,
3611 "shard {shard}: first batch must start at sequence 0, got {}",
3612 runs[0].0,
3613 );
3614 for window in runs.windows(2) {
3615 let (prev_start, prev_len) = window[0];
3616 let (next_start, _next_len) = window[1];
3617 let expected_next = prev_start
3618 .checked_add(prev_len as u64)
3619 .expect("test bounds keep us well below u64::MAX");
3620 assert!(
3621 next_start > prev_start,
3622 "shard {shard}: sequence_start must be strictly monotonic; \
3623 saw {prev_start} → {next_start}",
3624 );
3625 assert_eq!(
3626 next_start, expected_next,
3627 "shard {shard}: gap or overlap in sequence_start; \
3628 prev=({prev_start}, len={prev_len}) → next={next_start}, \
3629 expected {expected_next}. A gap reuses (shard, seq, i) \
3630 tuples after the JetStream/Redis dedup window closes; an \
3631 overlap silently overlays a later batch on an earlier \
3632 one's slot.",
3633 );
3634 }
3635 }
3636 }
3637
3638 /// Pin the within-process caching contract for the fallback
3639 /// (no-`producer_nonce_path`) path: two bus instances created
3640 /// in the same process see the SAME `batch_process_nonce()`
3641 /// because the helper is `OnceLock`-cached. The
3642 /// "different-across-restarts" semantic is a *process-level*
3643 /// guarantee — restart the process to get a fresh nonce — and
3644 /// is pinned by `persistent_producer_nonce_survives_bus_restart`
3645 /// (which uses a path; the without-path branch of #56 has no
3646 /// cross-restart guarantee by design).
3647 ///
3648 /// Cubic-ai P3: this test was previously named
3649 /// `process_nonce_fallback_differs_across_bus_instances`, which
3650 /// contradicted its own assertion (`assert_eq!(n_a, n_b)`).
3651 /// Renamed to match what it actually pins.
3652 #[tokio::test]
3653 async fn process_nonce_fallback_is_cached_within_process() {
3654 struct NonceRecordingAdapter {
3655 nonce: Arc<parking_lot::Mutex<Option<u64>>>,
3656 }
3657 #[async_trait::async_trait]
3658 impl crate::adapter::Adapter for NonceRecordingAdapter {
3659 async fn init(&mut self) -> Result<(), AdapterError> {
3660 Ok(())
3661 }
3662 async fn on_batch(&self, batch: Batch) -> Result<(), AdapterError> {
3663 *self.nonce.lock() = Some(batch.process_nonce);
3664 Ok(())
3665 }
3666 async fn flush(&self) -> Result<(), AdapterError> {
3667 Ok(())
3668 }
3669 async fn shutdown(&self) -> Result<(), AdapterError> {
3670 Ok(())
3671 }
3672 async fn poll_shard(
3673 &self,
3674 _: u16,
3675 _: Option<&str>,
3676 _: usize,
3677 ) -> Result<crate::adapter::ShardPollResult, AdapterError> {
3678 Ok(crate::adapter::ShardPollResult::empty())
3679 }
3680 fn name(&self) -> &'static str {
3681 "nonce-recording"
3682 }
3683 }
3684
3685 let cfg = || {
3686 EventBusConfig::builder()
3687 .num_shards(1)
3688 .ring_buffer_capacity(1024)
3689 .build()
3690 .unwrap()
3691 };
3692
3693 let n_a = Arc::new(parking_lot::Mutex::new(None));
3694 let n_b = Arc::new(parking_lot::Mutex::new(None));
3695 {
3696 let bus = EventBus::new_with_adapter(
3697 cfg(),
3698 Box::new(NonceRecordingAdapter { nonce: n_a.clone() }),
3699 )
3700 .await
3701 .unwrap();
3702 bus.ingest(Event::new(json!({"i": 1}))).unwrap();
3703 bus.flush().await.unwrap();
3704 bus.shutdown().await.unwrap();
3705 }
3706 {
3707 let bus = EventBus::new_with_adapter(
3708 cfg(),
3709 Box::new(NonceRecordingAdapter { nonce: n_b.clone() }),
3710 )
3711 .await
3712 .unwrap();
3713 bus.ingest(Event::new(json!({"i": 2}))).unwrap();
3714 bus.flush().await.unwrap();
3715 bus.shutdown().await.unwrap();
3716 }
3717
3718 // Note: in a single-process test BOTH bus instances see the
3719 // same `OnceLock`-cached `batch_process_nonce`, so the
3720 // nonces ARE equal here even though the documented
3721 // semantic is "fresh per process." This test pins the
3722 // cached-within-a-process invariant; the across-PROCESSES
3723 // semantic is exercised by the persistent-nonce test
3724 // above (which is the actually-load-bearing path for the
3725 // persistent-nonce fix).
3726 let n_a = n_a.lock().unwrap();
3727 let n_b = n_b.lock().unwrap();
3728 assert_eq!(
3729 n_a, n_b,
3730 "within one process, batch_process_nonce is OnceLock-cached \
3731 so two bus instances see the same nonce — the \
3732 different-across-restarts contract is process-level, \
3733 pinned via `persistent_producer_nonce_survives_bus_restart`",
3734 );
3735 }
3736
3737 /// Regression: `EventBusStats::batches_dispatched`
3738 /// (and the new `events_dispatched`) must actually increment on
3739 /// every successful adapter dispatch. Pre-fix `batches_dispatched`
3740 /// was declared but never updated, so flush()'s Phase 2 progress
3741 /// gate was constant-zero and early-broke after one window —
3742 /// flake on Windows-class timer resolution. Pin both counters
3743 /// directly here so a future refactor that drops the increment
3744 /// fails this test, not the timing-dependent
3745 /// `flush_is_a_delivery_barrier`.
3746 #[tokio::test]
3747 async fn dispatch_increments_bus_level_event_and_batch_counters() {
3748 let received = Arc::new(AtomicU64::new(0));
3749 let adapter: Box<dyn crate::adapter::Adapter> = Box::new(CountingAdapter {
3750 received: received.clone(),
3751 });
3752
3753 let config = EventBusConfig::builder()
3754 .num_shards(2)
3755 .ring_buffer_capacity(1024)
3756 .batch(crate::config::BatchConfig {
3757 min_size: 1,
3758 max_size: 10,
3759 max_delay: Duration::from_millis(10),
3760 adaptive: false,
3761 velocity_window: Duration::from_millis(100),
3762 })
3763 .build()
3764 .unwrap();
3765 let bus = EventBus::new_with_adapter(config, adapter).await.unwrap();
3766
3767 for i in 0..50 {
3768 bus.ingest(Event::new(json!({"i": i}))).unwrap();
3769 }
3770 bus.flush().await.unwrap();
3771
3772 let batches = bus.stats().batches_dispatched.load(AtomicOrdering::Acquire);
3773 let events = bus.stats().events_dispatched.load(AtomicOrdering::Acquire);
3774 assert!(
3775 batches > 0,
3776 "batches_dispatched must be > 0 after flush — pre-fix it was \
3777 never incremented anywhere, breaking flush()'s Phase 2 progress gate",
3778 );
3779 assert_eq!(
3780 events, 50,
3781 "events_dispatched must equal the number of events handed to \
3782 the adapter (got {events}, expected 50)",
3783 );
3784
3785 bus.shutdown().await.unwrap();
3786 }
3787
3788 /// Regression: BUG_REPORT.md #6 — drop-without-shutdown must
3789 /// still release the drain-finalize gate so detached drain
3790 /// workers can exit instead of parking on the gate until the
3791 /// internal `DRAIN_FINALIZE_TIMEOUT` deadline. Pinning this
3792 /// keeps the `Drop` impl honest if someone refactors the
3793 /// shutdown gates later.
3794 #[tokio::test]
3795 async fn drop_releases_drain_finalize_gate_promptly() {
3796 let config = EventBusConfig::builder()
3797 .num_shards(2)
3798 .ring_buffer_capacity(1024)
3799 .build()
3800 .unwrap();
3801 let bus = EventBus::new(config).await.unwrap();
3802 let drain_gate = bus.drain_finalize_ready.clone();
3803
3804 // Drop without an awaited shutdown.
3805 drop(bus);
3806
3807 // The Drop impl must have set the gate. `DRAIN_FINALIZE_TIMEOUT`
3808 // is 10s; if Drop didn't flip the gate, drain workers would
3809 // park for up to that long before exiting.
3810 assert!(
3811 drain_gate.load(AtomicOrdering::Acquire),
3812 "Drop must release `drain_finalize_ready` so detached drain \
3813 workers exit promptly"
3814 );
3815 }
3816}