net/shard/batch.rs
1//! Batch aggregation with adaptive sizing.
2//!
3//! The batch worker continuously drains events from a shard's ring buffer,
4//! assembles them into batches, and dispatches them to the adapter.
5//!
6//! # Adaptive Sizing
7//!
8//! Batch size is dynamically adjusted based on ingestion velocity:
9//! - High velocity → larger batches → fewer adapter calls → higher throughput
10//! - Low velocity → smaller batches → lower latency → faster flush
11
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use crate::config::BatchConfig;
18use crate::event::{Batch, InternalEvent};
19
20/// Cap on `AdaptiveBatcher::velocity_samples` to bound per-shard
21/// memory use under high throughput. `calculate_velocity` reads
22/// only `front()` and `back()`, so additional samples in between
23/// are pure overhead. 1 024 entries × ~24 bytes per tuple
24/// ≈ 24 KiB per shard — well below the 240 KiB pre-fix worst
25/// case at 100 k events/s × 100 ms `velocity_window`.
26const VELOCITY_SAMPLES_CAP: usize = 1024;
27
28/// Adaptive batch size calculator.
29///
30/// Tracks recent ingestion velocity and adjusts batch size accordingly.
31pub struct AdaptiveBatcher {
32 /// Configuration.
33 config: BatchConfig,
34 /// Current target batch size.
35 current_batch_size: usize,
36 /// Velocity samples: (timestamp, cumulative_count).
37 velocity_samples: VecDeque<(Instant, u64)>,
38 /// Total events seen (for velocity calculation).
39 total_events: u64,
40 /// Last recalculation time.
41 last_recalc: Instant,
42}
43
44impl AdaptiveBatcher {
45 /// Create a new adaptive batcher.
46 pub fn new(config: BatchConfig) -> Self {
47 Self {
48 current_batch_size: config.min_size,
49 velocity_samples: VecDeque::with_capacity(100),
50 total_events: 0,
51 last_recalc: Instant::now(),
52 config,
53 }
54 }
55
56 /// Record events and get the current target batch size.
57 ///
58 /// Call this each time events are drained from the ring buffer.
59 #[inline]
60 pub fn record_events(&mut self, count: usize) -> usize {
61 // Saturating-add: a stream that's ingested ~2^64 events
62 // is already in trouble, but a wrap from `u64::MAX` to a
63 // small value would interact with the
64 // `saturating_sub(oldest_count)` in `calculate_velocity`
65 // — the saturating-sub would underflow to 0 across the
66 // wraparound boundary and `velocity` would collapse to 0,
67 // forcing the batcher to its `min_size` floor right when
68 // sustained high throughput is exactly what the adaptive
69 // path was meant to handle. Saturating instead clamps at
70 // `u64::MAX` and `newest - oldest = 0` is the documented
71 // stop state.
72 self.total_events = self.total_events.saturating_add(count as u64);
73
74 if !self.config.adaptive {
75 return self.config.max_size;
76 }
77
78 let now = Instant::now();
79
80 // Add sample
81 self.velocity_samples.push_back((now, self.total_events));
82
83 // Remove old samples outside the time window.
84 //
85 // `Instant - Duration` panics on underflow, and on Windows
86 // `Instant` is QPC-relative to boot — a process that
87 // starts within `velocity_window` (typically a few
88 // seconds) of boot would abort the batch worker task
89 // here. `checked_sub` returns `None` on underflow; in
90 // that case skip the time-based eviction (every existing
91 // sample is "newer than the window floor" by definition,
92 // since the floor predates `Instant::now()`'s zero point).
93 // The sample-count cap below still bounds memory.
94 if let Some(window_start) = now.checked_sub(self.config.velocity_window) {
95 while let Some(&(ts, _)) = self.velocity_samples.front() {
96 if ts < window_start {
97 self.velocity_samples.pop_front();
98 } else {
99 break;
100 }
101 }
102 }
103
104 // Also cap the deque by sample COUNT. Pre-fix the
105 // bound was time-only, so at 100k events/s with a 100 ms
106 // velocity_window the deque could grow to ~10 000 entries
107 // before time-eviction caught up, costing ~240 KiB per
108 // shard for samples never used (calculate_velocity reads
109 // only `front()` and `back()`). Cap at
110 // VELOCITY_SAMPLES_CAP so the memory footprint is bounded
111 // regardless of throughput.
112 while self.velocity_samples.len() > VELOCITY_SAMPLES_CAP {
113 self.velocity_samples.pop_front();
114 }
115
116 // Recalculate batch size periodically (not on every call)
117 if now.duration_since(self.last_recalc) > Duration::from_millis(10) {
118 self.recalculate_batch_size();
119 self.last_recalc = now;
120 }
121
122 self.current_batch_size
123 }
124
125 /// Get the current target batch size.
126 #[inline]
127 pub fn batch_size(&self) -> usize {
128 self.current_batch_size
129 }
130
131 /// Calculate events per second based on recent samples.
132 #[expect(
133 clippy::unwrap_used,
134 reason = "len >= 2 guard above; .front() / .back() on a non-empty VecDeque is infallible"
135 )]
136 fn calculate_velocity(&self) -> f64 {
137 if self.velocity_samples.len() < 2 {
138 return 0.0;
139 }
140
141 let (oldest_ts, oldest_count) = *self.velocity_samples.front().unwrap();
142 let (newest_ts, newest_count) = *self.velocity_samples.back().unwrap();
143
144 let elapsed = newest_ts.duration_since(oldest_ts);
145 if elapsed.is_zero() {
146 return 0.0;
147 }
148
149 let events = newest_count.saturating_sub(oldest_count);
150 events as f64 / elapsed.as_secs_f64()
151 }
152
153 /// Recalculate the optimal batch size based on recent velocity.
154 fn recalculate_batch_size(&mut self) {
155 let velocity = self.calculate_velocity();
156
157 // Scale batch size with velocity
158 // At 1M events/sec → batch size ~5,000
159 // At 10M events/sec → batch size ~50,000 (capped at max)
160 //
161 // Explicit `clamp(0.0, usize::MAX as f64)` before the `as
162 // usize` cast: Rust's `as` cast on f64 → usize is
163 // saturating in current versions, but the explicit clamp
164 // documents intent and survives any future edition that
165 // tightens the cast (e.g. requires `try_from` on
166 // overflow). The `velocity > 0.0` guard above already
167 // rules out NaN and negative; the upper bound here only
168 // matters for the unreachable `velocity > usize::MAX *
169 // 200.0` case (~3.7e21 events/sec), but the saturation
170 // is cheaper than reasoning about future cast semantics.
171 let target = if velocity > 0.0 {
172 let scaled = (velocity / 200.0).clamp(0.0, usize::MAX as f64);
173 (scaled as usize).clamp(self.config.min_size, self.config.max_size)
174 } else {
175 self.config.min_size
176 };
177
178 // Smooth transitions using exponential moving average
179 // new = (old * 3 + target) / 4
180 //
181 // Saturating: `BatchConfig::validate` doesn't bound
182 // `max_size` from above, so a hostile config that pushes
183 // `current_batch_size` near `usize::MAX / 3` would
184 // overflow the multiply (debug: panic; release: wrap to a
185 // tiny value, collapsing the batcher to its `min_size`
186 // floor on the next clamp). Saturating preserves the
187 // intent — clamp at `usize::MAX` and let the bounds
188 // clamp below pull it back into the configured window.
189 self.current_batch_size = self
190 .current_batch_size
191 .saturating_mul(3)
192 .saturating_add(target)
193 / 4;
194
195 // Ensure we stay within bounds
196 self.current_batch_size = self
197 .current_batch_size
198 .clamp(self.config.min_size, self.config.max_size);
199 }
200
201 /// Reset the batcher state.
202 pub fn reset(&mut self) {
203 self.velocity_samples.clear();
204 self.total_events = 0;
205 self.current_batch_size = self.config.min_size;
206 self.last_recalc = Instant::now();
207 }
208}
209
210/// Batch worker state.
211///
212/// Manages batch assembly for a single shard.
213pub struct BatchWorker {
214 /// Shard ID.
215 shard_id: u16,
216 /// Adaptive batcher.
217 batcher: AdaptiveBatcher,
218 /// Current batch being assembled.
219 current_batch: Vec<InternalEvent>,
220 /// Sequence number for the next batch.
221 next_sequence: u64,
222 /// Mirror of `next_sequence` published to the bus, so
223 /// `EventBus::remove_shard_internal` can read the worker's
224 /// final post-flush sequence after awaiting the worker's
225 /// `JoinHandle`. Used as the `sequence_start` for the
226 /// stranded-ring-buffer flush so its msg-ids don't collide
227 /// with the worker's own first batch under JetStream's dedup
228 /// window.
229 ///
230 /// Updated on every successful `flush`. The hot path pays one
231 /// release-ordered atomic store per dispatched batch — the
232 /// per-batch dispatch already crosses an `await` so the
233 /// extra store is amortized away.
234 next_sequence_published: Arc<AtomicU64>,
235 /// Producer nonce stamped on every produced `Batch`.
236 ///
237 /// When the bus is configured with `producer_nonce_path`, this
238 /// is the persisted u64 from
239 /// `adapter::PersistentProducerNonce::load_or_create`. When
240 /// not configured, it falls back to the per-process nonce
241 /// from `event::batch_process_nonce`. Adapters that key dedup
242 /// on `(producer_nonce, shard, sequence_start, i)` (today:
243 /// JetStream `Nats-Msg-Id`, Redis `dedup_id` field) use this
244 /// to recognize cross-process retries.
245 producer_nonce: u64,
246 /// Time when the current batch started.
247 batch_start: Option<Instant>,
248 /// Configuration.
249 config: BatchConfig,
250}
251
252impl BatchWorker {
253 /// Create a new batch worker.
254 ///
255 /// `next_sequence_published` is the bus-owned mirror of
256 /// `next_sequence`. Pass `Arc::new(AtomicU64::new(0))` if the
257 /// caller doesn't need to observe the post-exit sequence;
258 /// production paths share it with `bus::remove_shard_internal`.
259 ///
260 /// `producer_nonce` is stamped on every produced `Batch` for
261 /// cross-process dedup. The bus passes its loaded nonce in;
262 /// tests can use any u64 (typically 0 or the per-process
263 /// default).
264 pub fn new(
265 shard_id: u16,
266 config: BatchConfig,
267 next_sequence_published: Arc<AtomicU64>,
268 producer_nonce: u64,
269 ) -> Self {
270 let capacity = config.max_size;
271 Self {
272 shard_id,
273 batcher: AdaptiveBatcher::new(config.clone()),
274 current_batch: Vec::with_capacity(capacity),
275 next_sequence: 0,
276 next_sequence_published,
277 producer_nonce,
278 batch_start: None,
279 config,
280 }
281 }
282
283 /// Add events to the current batch.
284 ///
285 /// Returns a completed batch if thresholds are met, or None if more events are needed.
286 ///
287 /// # Empty-input side effect
288 ///
289 /// Passing an empty `events` vec is **not** a no-op. The
290 /// BatchWorker's recv-timeout arm calls `add_events(vec![])`
291 /// specifically to drive a `check_timeout` round, which may
292 /// flush the in-memory `current_batch` if `max_delay` has
293 /// elapsed since the last event arrived. Callers who want
294 /// "true no-op on empty input" must check `events.is_empty()`
295 /// themselves before calling.
296 ///
297 /// Pre-fix this side effect was not documented and
298 /// surprised callers expecting `add_events([])` to be inert.
299 /// The fix is documentation only — the BatchWorker's timeout
300 /// flush relies on this behavior, so removing the side effect
301 /// would break the timeout-flush mechanism in bus.rs.
302 pub fn add_events(&mut self, events: Vec<InternalEvent>) -> Option<Batch> {
303 if events.is_empty() {
304 return self.check_timeout();
305 }
306
307 // Start batch timer if this is the first event
308 if self.current_batch.is_empty() {
309 self.batch_start = Some(Instant::now());
310 }
311
312 // Record events and get target batch size
313 let target_size = self.batcher.record_events(events.len());
314
315 // Add events to current batch
316 self.current_batch.extend(events);
317
318 // Check if we should flush
319 if self.current_batch.len() >= target_size {
320 return Some(self.flush());
321 }
322
323 // Check timeout
324 self.check_timeout()
325 }
326
327 /// Check if the batch should be flushed due to timeout.
328 ///
329 /// Pre-fix perf #38 in `docs/performance/net-perf-analysis.md`
330 /// this was private and the bus's timeout branch called
331 /// `add_events(vec![])` as the indirection — an empty `Vec`
332 /// allocation per timeout tick purely as a sentinel. The
333 /// method is now `pub` so callers can express the intent
334 /// directly without the alloc and without the documented
335 /// "empty `add_events` has a side effect" footgun.
336 pub fn check_timeout(&mut self) -> Option<Batch> {
337 if self.current_batch.is_empty() {
338 return None;
339 }
340
341 if let Some(start) = self.batch_start {
342 if start.elapsed() >= self.config.max_delay {
343 return Some(self.flush());
344 }
345 }
346
347 None
348 }
349
350 /// Force flush the current batch, even if thresholds aren't met.
351 pub fn flush(&mut self) -> Batch {
352 let events = std::mem::replace(
353 &mut self.current_batch,
354 Vec::with_capacity(self.config.max_size),
355 );
356
357 let sequence_start = self.next_sequence;
358 // `saturating_add` rather than `+=`: at u64 granularity this can
359 // only happen after wraparound (~584 years at 1 B events/s), but
360 // the wrap would silently corrupt sequence numbering. Saturating
361 // pins the counter at u64::MAX so downstream consumers see a
362 // monotonic, observable terminal state instead.
363 self.next_sequence = self.next_sequence.saturating_add(events.len() as u64);
364 // Publish the post-flush counter to the bus-owned mirror.
365 // `bus::remove_shard_internal` reads this after awaiting the
366 // worker's `JoinHandle` and uses it as the
367 // `sequence_start` for the stranded-ring-buffer flush — that
368 // guarantees the stranded msg-ids fall strictly past every
369 // msg-id this worker emitted, closing the JetStream-dedup
370 // collision risk.
371 self.next_sequence_published
372 .store(self.next_sequence, Ordering::Release);
373 self.batch_start = None;
374
375 Batch::with_nonce(self.shard_id, events, sequence_start, self.producer_nonce)
376 }
377
378 /// Check if there are pending events.
379 pub fn has_pending(&self) -> bool {
380 !self.current_batch.is_empty()
381 }
382
383 /// Get the number of pending events.
384 pub fn pending_count(&self) -> usize {
385 self.current_batch.len()
386 }
387
388 /// Get the current target batch size.
389 pub fn target_batch_size(&self) -> usize {
390 self.batcher.batch_size()
391 }
392
393 /// Get time until the current batch times out.
394 pub fn time_until_timeout(&self) -> Option<Duration> {
395 self.batch_start.map(|start| {
396 let elapsed = start.elapsed();
397 self.config.max_delay.saturating_sub(elapsed)
398 })
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use serde_json::json;
406
407 fn make_events(count: usize, shard_id: u16) -> Vec<InternalEvent> {
408 (0..count)
409 .map(|i| InternalEvent::from_value(json!({"i": i}), i as u64, shard_id))
410 .collect()
411 }
412
413 /// Test helper — most tests don't observe the published sequence,
414 /// they just need the third arg.
415 fn fresh_published() -> Arc<AtomicU64> {
416 Arc::new(AtomicU64::new(0))
417 }
418
419 #[test]
420 fn test_batch_size_threshold() {
421 let config = BatchConfig {
422 min_size: 10,
423 max_size: 100,
424 max_delay: Duration::from_secs(10),
425 adaptive: false,
426 velocity_window: Duration::from_millis(100),
427 };
428
429 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
430
431 // Add 50 events - should not trigger flush (target is 100 when adaptive=false)
432 let batch = worker.add_events(make_events(50, 0));
433 assert!(batch.is_none());
434 assert_eq!(worker.pending_count(), 50);
435
436 // Add 50 more - should trigger flush
437 let batch = worker.add_events(make_events(50, 0));
438 assert!(batch.is_some());
439 let batch = batch.unwrap();
440 assert_eq!(batch.events.len(), 100);
441 assert_eq!(batch.shard_id, 0);
442 }
443
444 /// `add_events(vec![])` is **not** a no-op. The activate-failure
445 /// rollback path in `bus.rs` and the BatchWorker's recv-timeout
446 /// arm both rely on the empty-input call to drive a
447 /// `check_timeout`, which can flush `current_batch` if
448 /// `max_delay` has elapsed. A future refactor that makes
449 /// `add_events([])` a true no-op would silently lose those
450 /// already-batched events on the rollback path. Pin the
451 /// load-bearing behavior here so any such "cleanup" trips a
452 /// failing test rather than producing a silent regression.
453 #[test]
454 fn add_events_empty_can_flush_via_timeout() {
455 let config = BatchConfig {
456 min_size: 10,
457 max_size: 1000,
458 max_delay: Duration::from_millis(1),
459 adaptive: false,
460 velocity_window: Duration::from_millis(100),
461 };
462 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
463
464 // Stage some events well below `min_size` so neither size
465 // threshold can hide the timeout-flush.
466 let pre = worker.add_events(make_events(3, 0));
467 assert!(pre.is_none(), "below min_size — no flush yet");
468
469 // Empty input *before* max_delay must be a no-op (returns
470 // None). This pins the second half of the contract: the
471 // side-effect is bounded to "check timeout", not "always
472 // flush".
473 let early = worker.add_events(vec![]);
474 assert!(
475 early.is_none(),
476 "empty input before max_delay must NOT flush — \
477 check_timeout returns None when start.elapsed() < max_delay"
478 );
479
480 // Wait past max_delay and call with empty input — must flush.
481 std::thread::sleep(Duration::from_millis(5));
482 let flushed = worker.add_events(vec![]);
483 assert!(
484 flushed.is_some(),
485 "empty input after max_delay MUST flush via check_timeout — \
486 this is the contract bus.rs and BatchWorker's recv-timeout \
487 arm rely on; making it a no-op silently loses events on \
488 the activate-failure rollback path"
489 );
490 assert_eq!(flushed.unwrap().events.len(), 3);
491 }
492
493 /// Pin perf #38: the direct `check_timeout()` path produces
494 /// the same outcomes as the legacy `add_events(vec![])` shape.
495 /// `bus.rs`'s recv-timeout arm now calls `check_timeout`
496 /// directly to avoid the empty-`Vec` allocation; the public
497 /// contract on both shapes must agree so a future refactor
498 /// can't drift them apart.
499 #[test]
500 fn check_timeout_matches_add_events_empty_signal() {
501 let config = BatchConfig {
502 min_size: 10,
503 max_size: 1000,
504 max_delay: Duration::from_millis(1),
505 adaptive: false,
506 velocity_window: Duration::from_millis(100),
507 };
508
509 // Empty-batch case: both shapes return None.
510 let mut w = BatchWorker::new(0, config.clone(), fresh_published(), 0);
511 assert!(w.check_timeout().is_none(), "no pending → None");
512
513 // Pending but pre-deadline: both shapes return None.
514 let _ = w.add_events(make_events(3, 0));
515 assert!(
516 w.check_timeout().is_none(),
517 "pending below threshold + pre-deadline → None",
518 );
519
520 // Pending past deadline: both shapes flush.
521 std::thread::sleep(Duration::from_millis(5));
522 let flushed = w.check_timeout();
523 assert!(
524 flushed.is_some(),
525 "pending past max_delay → check_timeout must flush",
526 );
527 assert_eq!(flushed.unwrap().events.len(), 3);
528 }
529
530 #[test]
531 fn test_batch_timeout() {
532 let config = BatchConfig {
533 min_size: 10,
534 max_size: 1000,
535 max_delay: Duration::from_millis(1),
536 adaptive: false,
537 velocity_window: Duration::from_millis(100),
538 };
539
540 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
541
542 // Add some events
543 let batch = worker.add_events(make_events(5, 0));
544 assert!(batch.is_none());
545
546 // Wait for timeout
547 std::thread::sleep(Duration::from_millis(5));
548
549 // Check timeout triggers flush
550 let batch = worker.add_events(vec![]);
551 assert!(batch.is_some());
552 assert_eq!(batch.unwrap().events.len(), 5);
553 }
554
555 #[test]
556 fn test_adaptive_batch_sizing() {
557 let config = BatchConfig {
558 min_size: 100,
559 max_size: 10_000,
560 max_delay: Duration::from_secs(10),
561 adaptive: true,
562 velocity_window: Duration::from_millis(100),
563 };
564
565 let mut batcher = AdaptiveBatcher::new(config);
566
567 // Initially should be at min_size
568 assert_eq!(batcher.batch_size(), 100);
569
570 // Simulate high velocity (add lots of events quickly)
571 for _ in 0..100 {
572 batcher.record_events(10_000);
573 std::thread::sleep(Duration::from_micros(100));
574 }
575
576 // Batch size should have increased
577 assert!(batcher.batch_size() > 100);
578 }
579
580 /// Regression: `recalculate_batch_size` previously did
581 /// `current_batch_size * 3 + target` with bare arithmetic. A
582 /// hostile `BatchConfig` with `max_size` near `usize::MAX / 3`
583 /// could push `current_batch_size` near that threshold, where
584 /// the multiply overflows — debug build panics, release wraps
585 /// to a tiny value. The fix saturates both the multiply and
586 /// add. Pin the saturation so a future revert ("simplify" the
587 /// arithmetic) is caught by the test rather than discovered
588 /// in production via a debug-build crash.
589 #[test]
590 fn recalculate_batch_size_saturates_on_hostile_max_size() {
591 let config = BatchConfig {
592 min_size: 1,
593 max_size: usize::MAX,
594 max_delay: Duration::from_secs(10),
595 adaptive: true,
596 velocity_window: Duration::from_millis(100),
597 };
598 let mut batcher = AdaptiveBatcher::new(config);
599
600 // Drive `current_batch_size` to a value where `* 3` would
601 // overflow. The field is module-private but we're in the
602 // same module, so direct mutation is fine.
603 batcher.current_batch_size = usize::MAX - 1;
604
605 // Pre-fix this would either debug-panic (`overflow when
606 // multiplying`) or release-wrap to a small value.
607 // Post-fix: saturating_mul keeps the result at usize::MAX
608 // and the bounds clamp pulls it back into [min_size,
609 // max_size]. Either way, no panic and no wrap to tiny.
610 batcher.recalculate_batch_size();
611
612 // Sanity: the resulting size is still inside the
613 // configured window and didn't wrap to a small value.
614 assert!(
615 batcher.current_batch_size >= 1,
616 "post-recalc batch size must respect min_size, got {}",
617 batcher.current_batch_size,
618 );
619 }
620
621 #[test]
622 fn test_force_flush() {
623 let config = BatchConfig {
624 min_size: 100,
625 max_size: 1000,
626 max_delay: Duration::from_secs(10),
627 adaptive: false,
628 velocity_window: Duration::from_millis(100),
629 };
630
631 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
632
633 // Add some events (below threshold)
634 worker.add_events(make_events(50, 0));
635 assert_eq!(worker.pending_count(), 50);
636
637 // Force flush
638 let batch = worker.flush();
639 assert_eq!(batch.events.len(), 50);
640 assert!(!worker.has_pending());
641 }
642
643 #[test]
644 fn test_sequence_numbers() {
645 let config = BatchConfig::default();
646 let mut worker = BatchWorker::new(0, config.clone(), fresh_published(), 0);
647
648 // Create batches and verify sequence numbers
649 worker.add_events(make_events(100, 0));
650 let batch1 = worker.flush();
651 assert_eq!(batch1.sequence_start, 0);
652
653 worker.add_events(make_events(50, 0));
654 let batch2 = worker.flush();
655 assert_eq!(batch2.sequence_start, 100);
656
657 worker.add_events(make_events(25, 0));
658 let batch3 = worker.flush();
659 assert_eq!(batch3.sequence_start, 150);
660 }
661
662 /// Regression: every `flush` must publish the
663 /// post-flush `next_sequence` to the shared atomic so
664 /// `bus::remove_shard_internal` can read it after awaiting the
665 /// worker and use it as the stranded-flush `sequence_start`.
666 /// Pre-fix the stranded batch hardcoded 0, colliding with the
667 /// worker's first batch under JetStream's dedup window.
668 #[test]
669 fn flush_publishes_post_flush_next_sequence_to_shared_atomic() {
670 let config = BatchConfig::default();
671 let published = Arc::new(AtomicU64::new(0));
672 let mut worker = BatchWorker::new(0, config, published.clone(), 0);
673
674 // Pre-flush: atomic is at its initial value.
675 assert_eq!(published.load(Ordering::Acquire), 0);
676
677 worker.add_events(make_events(50, 0));
678 let _ = worker.flush();
679
680 assert_eq!(
681 published.load(Ordering::Acquire),
682 50,
683 "post-flush atomic must mirror BatchWorker::next_sequence",
684 );
685 }
686
687 /// Consecutive flushes keep the published atomic in lock-step
688 /// with the internal counter — pin the addition (not just the
689 /// initial set) so a future refactor that updates only on
690 /// alternate flushes (or only when `events.is_empty()`) gets
691 /// caught.
692 #[test]
693 fn flush_publishes_advance_consecutive_flushes() {
694 let config = BatchConfig::default();
695 let published = Arc::new(AtomicU64::new(0));
696 let mut worker = BatchWorker::new(0, config, published.clone(), 0);
697
698 worker.add_events(make_events(10, 0));
699 let _ = worker.flush();
700 assert_eq!(published.load(Ordering::Acquire), 10);
701
702 worker.add_events(make_events(7, 0));
703 let _ = worker.flush();
704 assert_eq!(published.load(Ordering::Acquire), 17);
705
706 worker.add_events(make_events(33, 0));
707 let _ = worker.flush();
708 assert_eq!(published.load(Ordering::Acquire), 50);
709 }
710
711 /// Mirror the saturating-add overflow behavior on the published
712 /// atomic. `bus::remove_shard_internal` uses this value as a
713 /// `sequence_start`; if it ever overflowed back to 0 the
714 /// stranded batch's msg-ids would collide with the worker's
715 /// first batch — the exact JetStream-dedup hazard the
716 /// stranded-flush path is designed to avoid.
717 #[test]
718 fn flush_publishes_saturating_max_on_overflow() {
719 let config = BatchConfig::default();
720 let published = Arc::new(AtomicU64::new(0));
721 let mut worker = BatchWorker::new(0, config, published.clone(), 0);
722
723 worker.next_sequence = u64::MAX - 3;
724 worker.add_events(make_events(10, 0));
725 let _ = worker.flush();
726
727 assert_eq!(worker.next_sequence, u64::MAX);
728 assert_eq!(
729 published.load(Ordering::Acquire),
730 u64::MAX,
731 "published atomic must saturate at u64::MAX, not wrap to 6",
732 );
733 }
734
735 /// Regression: BUG_REPORT.md #19 — `next_sequence` previously used
736 /// unchecked `+=`, which would silently wrap on overflow. Saturating
737 /// pins it at `u64::MAX` so downstream consumers see a stable
738 /// terminal state instead of restarting at 0.
739 #[test]
740 fn test_sequence_saturates_on_overflow() {
741 let config = BatchConfig::default();
742 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
743
744 // Force the counter near overflow.
745 worker.next_sequence = u64::MAX - 3;
746
747 worker.add_events(make_events(10, 0));
748 let batch = worker.flush();
749
750 assert_eq!(batch.sequence_start, u64::MAX - 3);
751 // Without saturation this would wrap to 6 and the next batch
752 // would restart sequencing from there.
753 assert_eq!(worker.next_sequence, u64::MAX);
754 }
755}