net/shard/batch.rs
1//! Batch aggregation with adaptive sizing.
2//!
3//! The batch worker continuously drains events from a shard's ring buffer,
4//! assembles them into batches, and dispatches them to the adapter.
5//!
6//! # Adaptive Sizing
7//!
8//! Batch size is dynamically adjusted based on ingestion velocity:
9//! - High velocity → larger batches → fewer adapter calls → higher throughput
10//! - Low velocity → smaller batches → lower latency → faster flush
11
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use crate::config::BatchConfig;
18use crate::event::{Batch, InternalEvent};
19
20/// Cap on `AdaptiveBatcher::velocity_samples` to bound per-shard
21/// memory use under high throughput. `calculate_velocity` reads
22/// only `front()` and `back()`, so additional samples in between
23/// are pure overhead. 1 024 entries × ~24 bytes per tuple
24/// ≈ 24 KiB per shard — well below the 240 KiB pre-fix worst
25/// case at 100 k events/s × 100 ms `velocity_window`.
26const VELOCITY_SAMPLES_CAP: usize = 1024;
27
28/// Adaptive batch size calculator.
29///
30/// Tracks recent ingestion velocity and adjusts batch size accordingly.
31pub struct AdaptiveBatcher {
32 /// Configuration.
33 config: BatchConfig,
34 /// Current target batch size.
35 current_batch_size: usize,
36 /// Velocity samples: (timestamp, cumulative_count).
37 velocity_samples: VecDeque<(Instant, u64)>,
38 /// Total events seen (for velocity calculation).
39 total_events: u64,
40 /// Last recalculation time.
41 last_recalc: Instant,
42}
43
44impl AdaptiveBatcher {
45 /// Create a new adaptive batcher.
46 pub fn new(config: BatchConfig) -> Self {
47 Self {
48 current_batch_size: config.min_size,
49 velocity_samples: VecDeque::with_capacity(100),
50 total_events: 0,
51 last_recalc: Instant::now(),
52 config,
53 }
54 }
55
56 /// Record events and get the current target batch size.
57 ///
58 /// Call this each time events are drained from the ring buffer.
59 #[inline]
60 pub fn record_events(&mut self, count: usize) -> usize {
61 // Saturating-add: a stream that's ingested ~2^64 events
62 // is already in trouble, but a wrap from `u64::MAX` to a
63 // small value would interact with the
64 // `saturating_sub(oldest_count)` in `calculate_velocity`
65 // — the saturating-sub would underflow to 0 across the
66 // wraparound boundary and `velocity` would collapse to 0,
67 // forcing the batcher to its `min_size` floor right when
68 // sustained high throughput is exactly what the adaptive
69 // path was meant to handle. Saturating instead clamps at
70 // `u64::MAX` and `newest - oldest = 0` is the documented
71 // stop state.
72 self.total_events = self.total_events.saturating_add(count as u64);
73
74 if !self.config.adaptive {
75 return self.config.max_size;
76 }
77
78 let now = Instant::now();
79
80 // Add sample
81 self.velocity_samples.push_back((now, self.total_events));
82
83 // Remove old samples outside the time window.
84 //
85 // `Instant - Duration` panics on underflow, and on Windows
86 // `Instant` is QPC-relative to boot — a process that
87 // starts within `velocity_window` (typically a few
88 // seconds) of boot would abort the batch worker task
89 // here. `checked_sub` returns `None` on underflow; in
90 // that case skip the time-based eviction (every existing
91 // sample is "newer than the window floor" by definition,
92 // since the floor predates `Instant::now()`'s zero point).
93 // The sample-count cap below still bounds memory.
94 if let Some(window_start) = now.checked_sub(self.config.velocity_window) {
95 while let Some(&(ts, _)) = self.velocity_samples.front() {
96 if ts < window_start {
97 self.velocity_samples.pop_front();
98 } else {
99 break;
100 }
101 }
102 }
103
104 // Also cap the deque by sample COUNT. Pre-fix the
105 // bound was time-only, so at 100k events/s with a 100 ms
106 // velocity_window the deque could grow to ~10 000 entries
107 // before time-eviction caught up, costing ~240 KiB per
108 // shard for samples never used (calculate_velocity reads
109 // only `front()` and `back()`). Cap at
110 // VELOCITY_SAMPLES_CAP so the memory footprint is bounded
111 // regardless of throughput.
112 while self.velocity_samples.len() > VELOCITY_SAMPLES_CAP {
113 self.velocity_samples.pop_front();
114 }
115
116 // Recalculate batch size periodically (not on every call)
117 if now.duration_since(self.last_recalc) > Duration::from_millis(10) {
118 self.recalculate_batch_size();
119 self.last_recalc = now;
120 }
121
122 self.current_batch_size
123 }
124
125 /// Get the current target batch size.
126 #[inline]
127 pub fn batch_size(&self) -> usize {
128 self.current_batch_size
129 }
130
131 /// Calculate events per second based on recent samples.
132 fn calculate_velocity(&self) -> f64 {
133 if self.velocity_samples.len() < 2 {
134 return 0.0;
135 }
136
137 let (oldest_ts, oldest_count) = *self.velocity_samples.front().unwrap();
138 let (newest_ts, newest_count) = *self.velocity_samples.back().unwrap();
139
140 let elapsed = newest_ts.duration_since(oldest_ts);
141 if elapsed.is_zero() {
142 return 0.0;
143 }
144
145 let events = newest_count.saturating_sub(oldest_count);
146 events as f64 / elapsed.as_secs_f64()
147 }
148
149 /// Recalculate the optimal batch size based on recent velocity.
150 fn recalculate_batch_size(&mut self) {
151 let velocity = self.calculate_velocity();
152
153 // Scale batch size with velocity
154 // At 1M events/sec → batch size ~5,000
155 // At 10M events/sec → batch size ~50,000 (capped at max)
156 //
157 // Explicit `clamp(0.0, usize::MAX as f64)` before the `as
158 // usize` cast: Rust's `as` cast on f64 → usize is
159 // saturating in current versions, but the explicit clamp
160 // documents intent and survives any future edition that
161 // tightens the cast (e.g. requires `try_from` on
162 // overflow). The `velocity > 0.0` guard above already
163 // rules out NaN and negative; the upper bound here only
164 // matters for the unreachable `velocity > usize::MAX *
165 // 200.0` case (~3.7e21 events/sec), but the saturation
166 // is cheaper than reasoning about future cast semantics.
167 let target = if velocity > 0.0 {
168 let scaled = (velocity / 200.0).clamp(0.0, usize::MAX as f64);
169 (scaled as usize).clamp(self.config.min_size, self.config.max_size)
170 } else {
171 self.config.min_size
172 };
173
174 // Smooth transitions using exponential moving average
175 // new = (old * 3 + target) / 4
176 //
177 // Saturating: `BatchConfig::validate` doesn't bound
178 // `max_size` from above, so a hostile config that pushes
179 // `current_batch_size` near `usize::MAX / 3` would
180 // overflow the multiply (debug: panic; release: wrap to a
181 // tiny value, collapsing the batcher to its `min_size`
182 // floor on the next clamp). Saturating preserves the
183 // intent — clamp at `usize::MAX` and let the bounds
184 // clamp below pull it back into the configured window.
185 self.current_batch_size = self
186 .current_batch_size
187 .saturating_mul(3)
188 .saturating_add(target)
189 / 4;
190
191 // Ensure we stay within bounds
192 self.current_batch_size = self
193 .current_batch_size
194 .clamp(self.config.min_size, self.config.max_size);
195 }
196
197 /// Reset the batcher state.
198 pub fn reset(&mut self) {
199 self.velocity_samples.clear();
200 self.total_events = 0;
201 self.current_batch_size = self.config.min_size;
202 self.last_recalc = Instant::now();
203 }
204}
205
206/// Batch worker state.
207///
208/// Manages batch assembly for a single shard.
209pub struct BatchWorker {
210 /// Shard ID.
211 shard_id: u16,
212 /// Adaptive batcher.
213 batcher: AdaptiveBatcher,
214 /// Current batch being assembled.
215 current_batch: Vec<InternalEvent>,
216 /// Sequence number for the next batch.
217 next_sequence: u64,
218 /// Mirror of `next_sequence` published to the bus, so
219 /// `EventBus::remove_shard_internal` can read the worker's
220 /// final post-flush sequence after awaiting the worker's
221 /// `JoinHandle`. Used as the `sequence_start` for the
222 /// stranded-ring-buffer flush so its msg-ids don't collide
223 /// with the worker's own first batch under JetStream's dedup
224 /// window.
225 ///
226 /// Updated on every successful `flush`. The hot path pays one
227 /// release-ordered atomic store per dispatched batch — the
228 /// per-batch dispatch already crosses an `await` so the
229 /// extra store is amortized away.
230 next_sequence_published: Arc<AtomicU64>,
231 /// Producer nonce stamped on every produced `Batch`.
232 ///
233 /// When the bus is configured with `producer_nonce_path`, this
234 /// is the persisted u64 from
235 /// `adapter::PersistentProducerNonce::load_or_create`. When
236 /// not configured, it falls back to the per-process nonce
237 /// from `event::batch_process_nonce`. Adapters that key dedup
238 /// on `(producer_nonce, shard, sequence_start, i)` (today:
239 /// JetStream `Nats-Msg-Id`, Redis `dedup_id` field) use this
240 /// to recognize cross-process retries.
241 producer_nonce: u64,
242 /// Time when the current batch started.
243 batch_start: Option<Instant>,
244 /// Configuration.
245 config: BatchConfig,
246}
247
248impl BatchWorker {
249 /// Create a new batch worker.
250 ///
251 /// `next_sequence_published` is the bus-owned mirror of
252 /// `next_sequence`. Pass `Arc::new(AtomicU64::new(0))` if the
253 /// caller doesn't need to observe the post-exit sequence;
254 /// production paths share it with `bus::remove_shard_internal`.
255 ///
256 /// `producer_nonce` is stamped on every produced `Batch` for
257 /// cross-process dedup. The bus passes its loaded nonce in;
258 /// tests can use any u64 (typically 0 or the per-process
259 /// default).
260 pub fn new(
261 shard_id: u16,
262 config: BatchConfig,
263 next_sequence_published: Arc<AtomicU64>,
264 producer_nonce: u64,
265 ) -> Self {
266 let capacity = config.max_size;
267 Self {
268 shard_id,
269 batcher: AdaptiveBatcher::new(config.clone()),
270 current_batch: Vec::with_capacity(capacity),
271 next_sequence: 0,
272 next_sequence_published,
273 producer_nonce,
274 batch_start: None,
275 config,
276 }
277 }
278
279 /// Add events to the current batch.
280 ///
281 /// Returns a completed batch if thresholds are met, or None if more events are needed.
282 ///
283 /// # Empty-input side effect
284 ///
285 /// Passing an empty `events` vec is **not** a no-op. The
286 /// BatchWorker's recv-timeout arm calls `add_events(vec![])`
287 /// specifically to drive a `check_timeout` round, which may
288 /// flush the in-memory `current_batch` if `max_delay` has
289 /// elapsed since the last event arrived. Callers who want
290 /// "true no-op on empty input" must check `events.is_empty()`
291 /// themselves before calling.
292 ///
293 /// Pre-fix this side effect was not documented and
294 /// surprised callers expecting `add_events([])` to be inert.
295 /// The fix is documentation only — the BatchWorker's timeout
296 /// flush relies on this behavior, so removing the side effect
297 /// would break the timeout-flush mechanism in bus.rs.
298 pub fn add_events(&mut self, events: Vec<InternalEvent>) -> Option<Batch> {
299 if events.is_empty() {
300 return self.check_timeout();
301 }
302
303 // Start batch timer if this is the first event
304 if self.current_batch.is_empty() {
305 self.batch_start = Some(Instant::now());
306 }
307
308 // Record events and get target batch size
309 let target_size = self.batcher.record_events(events.len());
310
311 // Add events to current batch
312 self.current_batch.extend(events);
313
314 // Check if we should flush
315 if self.current_batch.len() >= target_size {
316 return Some(self.flush());
317 }
318
319 // Check timeout
320 self.check_timeout()
321 }
322
323 /// Check if the batch should be flushed due to timeout.
324 fn check_timeout(&mut self) -> Option<Batch> {
325 if self.current_batch.is_empty() {
326 return None;
327 }
328
329 if let Some(start) = self.batch_start {
330 if start.elapsed() >= self.config.max_delay {
331 return Some(self.flush());
332 }
333 }
334
335 None
336 }
337
338 /// Force flush the current batch, even if thresholds aren't met.
339 pub fn flush(&mut self) -> Batch {
340 let events = std::mem::replace(
341 &mut self.current_batch,
342 Vec::with_capacity(self.config.max_size),
343 );
344
345 let sequence_start = self.next_sequence;
346 // `saturating_add` rather than `+=`: at u64 granularity this can
347 // only happen after wraparound (~584 years at 1 B events/s), but
348 // the wrap would silently corrupt sequence numbering. Saturating
349 // pins the counter at u64::MAX so downstream consumers see a
350 // monotonic, observable terminal state instead.
351 self.next_sequence = self.next_sequence.saturating_add(events.len() as u64);
352 // Publish the post-flush counter to the bus-owned mirror.
353 // `bus::remove_shard_internal` reads this after awaiting the
354 // worker's `JoinHandle` and uses it as the
355 // `sequence_start` for the stranded-ring-buffer flush — that
356 // guarantees the stranded msg-ids fall strictly past every
357 // msg-id this worker emitted, closing the JetStream-dedup
358 // collision risk.
359 self.next_sequence_published
360 .store(self.next_sequence, Ordering::Release);
361 self.batch_start = None;
362
363 Batch::with_nonce(self.shard_id, events, sequence_start, self.producer_nonce)
364 }
365
366 /// Check if there are pending events.
367 pub fn has_pending(&self) -> bool {
368 !self.current_batch.is_empty()
369 }
370
371 /// Get the number of pending events.
372 pub fn pending_count(&self) -> usize {
373 self.current_batch.len()
374 }
375
376 /// Get the current target batch size.
377 pub fn target_batch_size(&self) -> usize {
378 self.batcher.batch_size()
379 }
380
381 /// Get time until the current batch times out.
382 pub fn time_until_timeout(&self) -> Option<Duration> {
383 self.batch_start.map(|start| {
384 let elapsed = start.elapsed();
385 self.config.max_delay.saturating_sub(elapsed)
386 })
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use serde_json::json;
394
395 fn make_events(count: usize, shard_id: u16) -> Vec<InternalEvent> {
396 (0..count)
397 .map(|i| InternalEvent::from_value(json!({"i": i}), i as u64, shard_id))
398 .collect()
399 }
400
401 /// Test helper — most tests don't observe the published sequence,
402 /// they just need the third arg.
403 fn fresh_published() -> Arc<AtomicU64> {
404 Arc::new(AtomicU64::new(0))
405 }
406
407 #[test]
408 fn test_batch_size_threshold() {
409 let config = BatchConfig {
410 min_size: 10,
411 max_size: 100,
412 max_delay: Duration::from_secs(10),
413 adaptive: false,
414 velocity_window: Duration::from_millis(100),
415 };
416
417 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
418
419 // Add 50 events - should not trigger flush (target is 100 when adaptive=false)
420 let batch = worker.add_events(make_events(50, 0));
421 assert!(batch.is_none());
422 assert_eq!(worker.pending_count(), 50);
423
424 // Add 50 more - should trigger flush
425 let batch = worker.add_events(make_events(50, 0));
426 assert!(batch.is_some());
427 let batch = batch.unwrap();
428 assert_eq!(batch.events.len(), 100);
429 assert_eq!(batch.shard_id, 0);
430 }
431
432 /// `add_events(vec![])` is **not** a no-op. The activate-failure
433 /// rollback path in `bus.rs` and the BatchWorker's recv-timeout
434 /// arm both rely on the empty-input call to drive a
435 /// `check_timeout`, which can flush `current_batch` if
436 /// `max_delay` has elapsed. A future refactor that makes
437 /// `add_events([])` a true no-op would silently lose those
438 /// already-batched events on the rollback path. Pin the
439 /// load-bearing behavior here so any such "cleanup" trips a
440 /// failing test rather than producing a silent regression.
441 #[test]
442 fn add_events_empty_can_flush_via_timeout() {
443 let config = BatchConfig {
444 min_size: 10,
445 max_size: 1000,
446 max_delay: Duration::from_millis(1),
447 adaptive: false,
448 velocity_window: Duration::from_millis(100),
449 };
450 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
451
452 // Stage some events well below `min_size` so neither size
453 // threshold can hide the timeout-flush.
454 let pre = worker.add_events(make_events(3, 0));
455 assert!(pre.is_none(), "below min_size — no flush yet");
456
457 // Empty input *before* max_delay must be a no-op (returns
458 // None). This pins the second half of the contract: the
459 // side-effect is bounded to "check timeout", not "always
460 // flush".
461 let early = worker.add_events(vec![]);
462 assert!(
463 early.is_none(),
464 "empty input before max_delay must NOT flush — \
465 check_timeout returns None when start.elapsed() < max_delay"
466 );
467
468 // Wait past max_delay and call with empty input — must flush.
469 std::thread::sleep(Duration::from_millis(5));
470 let flushed = worker.add_events(vec![]);
471 assert!(
472 flushed.is_some(),
473 "empty input after max_delay MUST flush via check_timeout — \
474 this is the contract bus.rs and BatchWorker's recv-timeout \
475 arm rely on; making it a no-op silently loses events on \
476 the activate-failure rollback path"
477 );
478 assert_eq!(flushed.unwrap().events.len(), 3);
479 }
480
481 #[test]
482 fn test_batch_timeout() {
483 let config = BatchConfig {
484 min_size: 10,
485 max_size: 1000,
486 max_delay: Duration::from_millis(1),
487 adaptive: false,
488 velocity_window: Duration::from_millis(100),
489 };
490
491 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
492
493 // Add some events
494 let batch = worker.add_events(make_events(5, 0));
495 assert!(batch.is_none());
496
497 // Wait for timeout
498 std::thread::sleep(Duration::from_millis(5));
499
500 // Check timeout triggers flush
501 let batch = worker.add_events(vec![]);
502 assert!(batch.is_some());
503 assert_eq!(batch.unwrap().events.len(), 5);
504 }
505
506 #[test]
507 fn test_adaptive_batch_sizing() {
508 let config = BatchConfig {
509 min_size: 100,
510 max_size: 10_000,
511 max_delay: Duration::from_secs(10),
512 adaptive: true,
513 velocity_window: Duration::from_millis(100),
514 };
515
516 let mut batcher = AdaptiveBatcher::new(config);
517
518 // Initially should be at min_size
519 assert_eq!(batcher.batch_size(), 100);
520
521 // Simulate high velocity (add lots of events quickly)
522 for _ in 0..100 {
523 batcher.record_events(10_000);
524 std::thread::sleep(Duration::from_micros(100));
525 }
526
527 // Batch size should have increased
528 assert!(batcher.batch_size() > 100);
529 }
530
531 /// Regression: `recalculate_batch_size` previously did
532 /// `current_batch_size * 3 + target` with bare arithmetic. A
533 /// hostile `BatchConfig` with `max_size` near `usize::MAX / 3`
534 /// could push `current_batch_size` near that threshold, where
535 /// the multiply overflows — debug build panics, release wraps
536 /// to a tiny value. The fix saturates both the multiply and
537 /// add. Pin the saturation so a future revert ("simplify" the
538 /// arithmetic) is caught by the test rather than discovered
539 /// in production via a debug-build crash.
540 #[test]
541 fn recalculate_batch_size_saturates_on_hostile_max_size() {
542 let config = BatchConfig {
543 min_size: 1,
544 max_size: usize::MAX,
545 max_delay: Duration::from_secs(10),
546 adaptive: true,
547 velocity_window: Duration::from_millis(100),
548 };
549 let mut batcher = AdaptiveBatcher::new(config);
550
551 // Drive `current_batch_size` to a value where `* 3` would
552 // overflow. The field is module-private but we're in the
553 // same module, so direct mutation is fine.
554 batcher.current_batch_size = usize::MAX - 1;
555
556 // Pre-fix this would either debug-panic (`overflow when
557 // multiplying`) or release-wrap to a small value.
558 // Post-fix: saturating_mul keeps the result at usize::MAX
559 // and the bounds clamp pulls it back into [min_size,
560 // max_size]. Either way, no panic and no wrap to tiny.
561 batcher.recalculate_batch_size();
562
563 // Sanity: the resulting size is still inside the
564 // configured window and didn't wrap to a small value.
565 assert!(
566 batcher.current_batch_size >= 1,
567 "post-recalc batch size must respect min_size, got {}",
568 batcher.current_batch_size,
569 );
570 }
571
572 #[test]
573 fn test_force_flush() {
574 let config = BatchConfig {
575 min_size: 100,
576 max_size: 1000,
577 max_delay: Duration::from_secs(10),
578 adaptive: false,
579 velocity_window: Duration::from_millis(100),
580 };
581
582 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
583
584 // Add some events (below threshold)
585 worker.add_events(make_events(50, 0));
586 assert_eq!(worker.pending_count(), 50);
587
588 // Force flush
589 let batch = worker.flush();
590 assert_eq!(batch.events.len(), 50);
591 assert!(!worker.has_pending());
592 }
593
594 #[test]
595 fn test_sequence_numbers() {
596 let config = BatchConfig::default();
597 let mut worker = BatchWorker::new(0, config.clone(), fresh_published(), 0);
598
599 // Create batches and verify sequence numbers
600 worker.add_events(make_events(100, 0));
601 let batch1 = worker.flush();
602 assert_eq!(batch1.sequence_start, 0);
603
604 worker.add_events(make_events(50, 0));
605 let batch2 = worker.flush();
606 assert_eq!(batch2.sequence_start, 100);
607
608 worker.add_events(make_events(25, 0));
609 let batch3 = worker.flush();
610 assert_eq!(batch3.sequence_start, 150);
611 }
612
613 /// Regression: every `flush` must publish the
614 /// post-flush `next_sequence` to the shared atomic so
615 /// `bus::remove_shard_internal` can read it after awaiting the
616 /// worker and use it as the stranded-flush `sequence_start`.
617 /// Pre-fix the stranded batch hardcoded 0, colliding with the
618 /// worker's first batch under JetStream's dedup window.
619 #[test]
620 fn flush_publishes_post_flush_next_sequence_to_shared_atomic() {
621 let config = BatchConfig::default();
622 let published = Arc::new(AtomicU64::new(0));
623 let mut worker = BatchWorker::new(0, config, published.clone(), 0);
624
625 // Pre-flush: atomic is at its initial value.
626 assert_eq!(published.load(Ordering::Acquire), 0);
627
628 worker.add_events(make_events(50, 0));
629 let _ = worker.flush();
630
631 assert_eq!(
632 published.load(Ordering::Acquire),
633 50,
634 "post-flush atomic must mirror BatchWorker::next_sequence",
635 );
636 }
637
638 /// Consecutive flushes keep the published atomic in lock-step
639 /// with the internal counter — pin the addition (not just the
640 /// initial set) so a future refactor that updates only on
641 /// alternate flushes (or only when `events.is_empty()`) gets
642 /// caught.
643 #[test]
644 fn flush_publishes_advance_consecutive_flushes() {
645 let config = BatchConfig::default();
646 let published = Arc::new(AtomicU64::new(0));
647 let mut worker = BatchWorker::new(0, config, published.clone(), 0);
648
649 worker.add_events(make_events(10, 0));
650 let _ = worker.flush();
651 assert_eq!(published.load(Ordering::Acquire), 10);
652
653 worker.add_events(make_events(7, 0));
654 let _ = worker.flush();
655 assert_eq!(published.load(Ordering::Acquire), 17);
656
657 worker.add_events(make_events(33, 0));
658 let _ = worker.flush();
659 assert_eq!(published.load(Ordering::Acquire), 50);
660 }
661
662 /// Mirror the saturating-add overflow behavior on the published
663 /// atomic. `bus::remove_shard_internal` uses this value as a
664 /// `sequence_start`; if it ever overflowed back to 0 the
665 /// stranded batch's msg-ids would collide with the worker's
666 /// first batch — the exact JetStream-dedup hazard the
667 /// stranded-flush path is designed to avoid.
668 #[test]
669 fn flush_publishes_saturating_max_on_overflow() {
670 let config = BatchConfig::default();
671 let published = Arc::new(AtomicU64::new(0));
672 let mut worker = BatchWorker::new(0, config, published.clone(), 0);
673
674 worker.next_sequence = u64::MAX - 3;
675 worker.add_events(make_events(10, 0));
676 let _ = worker.flush();
677
678 assert_eq!(worker.next_sequence, u64::MAX);
679 assert_eq!(
680 published.load(Ordering::Acquire),
681 u64::MAX,
682 "published atomic must saturate at u64::MAX, not wrap to 6",
683 );
684 }
685
686 /// Regression: BUG_REPORT.md #19 — `next_sequence` previously used
687 /// unchecked `+=`, which would silently wrap on overflow. Saturating
688 /// pins it at `u64::MAX` so downstream consumers see a stable
689 /// terminal state instead of restarting at 0.
690 #[test]
691 fn test_sequence_saturates_on_overflow() {
692 let config = BatchConfig::default();
693 let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
694
695 // Force the counter near overflow.
696 worker.next_sequence = u64::MAX - 3;
697
698 worker.add_events(make_events(10, 0));
699 let batch = worker.flush();
700
701 assert_eq!(batch.sequence_start, u64::MAX - 3);
702 // Without saturation this would wrap to 6 and the next batch
703 // would restart sequencing from there.
704 assert_eq!(worker.next_sequence, u64::MAX);
705 }
706}