Skip to main content

net/shard/
batch.rs

1//! Batch aggregation with adaptive sizing.
2//!
3//! The batch worker continuously drains events from a shard's ring buffer,
4//! assembles them into batches, and dispatches them to the adapter.
5//!
6//! # Adaptive Sizing
7//!
8//! Batch size is dynamically adjusted based on ingestion velocity:
9//! - High velocity → larger batches → fewer adapter calls → higher throughput
10//! - Low velocity → smaller batches → lower latency → faster flush
11
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use crate::config::BatchConfig;
18use crate::event::{Batch, InternalEvent};
19
20/// Cap on `AdaptiveBatcher::velocity_samples` to bound per-shard
21/// memory use under high throughput. `calculate_velocity` reads
22/// only `front()` and `back()`, so additional samples in between
23/// are pure overhead. 1 024 entries × ~24 bytes per tuple
24/// ≈ 24 KiB per shard — well below the 240 KiB pre-fix worst
25/// case at 100 k events/s × 100 ms `velocity_window`.
26const VELOCITY_SAMPLES_CAP: usize = 1024;
27
28/// Adaptive batch size calculator.
29///
30/// Tracks recent ingestion velocity and adjusts batch size accordingly.
31pub struct AdaptiveBatcher {
32    /// Configuration.
33    config: BatchConfig,
34    /// Current target batch size.
35    current_batch_size: usize,
36    /// Velocity samples: (timestamp, cumulative_count).
37    velocity_samples: VecDeque<(Instant, u64)>,
38    /// Total events seen (for velocity calculation).
39    total_events: u64,
40    /// Last recalculation time.
41    last_recalc: Instant,
42}
43
44impl AdaptiveBatcher {
45    /// Create a new adaptive batcher.
46    pub fn new(config: BatchConfig) -> Self {
47        Self {
48            current_batch_size: config.min_size,
49            velocity_samples: VecDeque::with_capacity(100),
50            total_events: 0,
51            last_recalc: Instant::now(),
52            config,
53        }
54    }
55
56    /// Record events and get the current target batch size.
57    ///
58    /// Call this each time events are drained from the ring buffer.
59    #[inline]
60    pub fn record_events(&mut self, count: usize) -> usize {
61        // Saturating-add: a stream that's ingested ~2^64 events
62        // is already in trouble, but a wrap from `u64::MAX` to a
63        // small value would interact with the
64        // `saturating_sub(oldest_count)` in `calculate_velocity`
65        // — the saturating-sub would underflow to 0 across the
66        // wraparound boundary and `velocity` would collapse to 0,
67        // forcing the batcher to its `min_size` floor right when
68        // sustained high throughput is exactly what the adaptive
69        // path was meant to handle. Saturating instead clamps at
70        // `u64::MAX` and `newest - oldest = 0` is the documented
71        // stop state.
72        self.total_events = self.total_events.saturating_add(count as u64);
73
74        if !self.config.adaptive {
75            return self.config.max_size;
76        }
77
78        let now = Instant::now();
79
80        // Add sample
81        self.velocity_samples.push_back((now, self.total_events));
82
83        // Remove old samples outside the time window.
84        //
85        // `Instant - Duration` panics on underflow, and on Windows
86        // `Instant` is QPC-relative to boot — a process that
87        // starts within `velocity_window` (typically a few
88        // seconds) of boot would abort the batch worker task
89        // here. `checked_sub` returns `None` on underflow; in
90        // that case skip the time-based eviction (every existing
91        // sample is "newer than the window floor" by definition,
92        // since the floor predates `Instant::now()`'s zero point).
93        // The sample-count cap below still bounds memory.
94        if let Some(window_start) = now.checked_sub(self.config.velocity_window) {
95            while let Some(&(ts, _)) = self.velocity_samples.front() {
96                if ts < window_start {
97                    self.velocity_samples.pop_front();
98                } else {
99                    break;
100                }
101            }
102        }
103
104        // Also cap the deque by sample COUNT. Pre-fix the
105        // bound was time-only, so at 100k events/s with a 100 ms
106        // velocity_window the deque could grow to ~10 000 entries
107        // before time-eviction caught up, costing ~240 KiB per
108        // shard for samples never used (calculate_velocity reads
109        // only `front()` and `back()`). Cap at
110        // VELOCITY_SAMPLES_CAP so the memory footprint is bounded
111        // regardless of throughput.
112        while self.velocity_samples.len() > VELOCITY_SAMPLES_CAP {
113            self.velocity_samples.pop_front();
114        }
115
116        // Recalculate batch size periodically (not on every call)
117        if now.duration_since(self.last_recalc) > Duration::from_millis(10) {
118            self.recalculate_batch_size();
119            self.last_recalc = now;
120        }
121
122        self.current_batch_size
123    }
124
125    /// Get the current target batch size.
126    #[inline]
127    pub fn batch_size(&self) -> usize {
128        self.current_batch_size
129    }
130
131    /// Calculate events per second based on recent samples.
132    fn calculate_velocity(&self) -> f64 {
133        if self.velocity_samples.len() < 2 {
134            return 0.0;
135        }
136
137        let (oldest_ts, oldest_count) = *self.velocity_samples.front().unwrap();
138        let (newest_ts, newest_count) = *self.velocity_samples.back().unwrap();
139
140        let elapsed = newest_ts.duration_since(oldest_ts);
141        if elapsed.is_zero() {
142            return 0.0;
143        }
144
145        let events = newest_count.saturating_sub(oldest_count);
146        events as f64 / elapsed.as_secs_f64()
147    }
148
149    /// Recalculate the optimal batch size based on recent velocity.
150    fn recalculate_batch_size(&mut self) {
151        let velocity = self.calculate_velocity();
152
153        // Scale batch size with velocity
154        // At 1M events/sec → batch size ~5,000
155        // At 10M events/sec → batch size ~50,000 (capped at max)
156        //
157        // Explicit `clamp(0.0, usize::MAX as f64)` before the `as
158        // usize` cast: Rust's `as` cast on f64 → usize is
159        // saturating in current versions, but the explicit clamp
160        // documents intent and survives any future edition that
161        // tightens the cast (e.g. requires `try_from` on
162        // overflow). The `velocity > 0.0` guard above already
163        // rules out NaN and negative; the upper bound here only
164        // matters for the unreachable `velocity > usize::MAX *
165        // 200.0` case (~3.7e21 events/sec), but the saturation
166        // is cheaper than reasoning about future cast semantics.
167        let target = if velocity > 0.0 {
168            let scaled = (velocity / 200.0).clamp(0.0, usize::MAX as f64);
169            (scaled as usize).clamp(self.config.min_size, self.config.max_size)
170        } else {
171            self.config.min_size
172        };
173
174        // Smooth transitions using exponential moving average
175        // new = (old * 3 + target) / 4
176        //
177        // Saturating: `BatchConfig::validate` doesn't bound
178        // `max_size` from above, so a hostile config that pushes
179        // `current_batch_size` near `usize::MAX / 3` would
180        // overflow the multiply (debug: panic; release: wrap to a
181        // tiny value, collapsing the batcher to its `min_size`
182        // floor on the next clamp). Saturating preserves the
183        // intent — clamp at `usize::MAX` and let the bounds
184        // clamp below pull it back into the configured window.
185        self.current_batch_size = self
186            .current_batch_size
187            .saturating_mul(3)
188            .saturating_add(target)
189            / 4;
190
191        // Ensure we stay within bounds
192        self.current_batch_size = self
193            .current_batch_size
194            .clamp(self.config.min_size, self.config.max_size);
195    }
196
197    /// Reset the batcher state.
198    pub fn reset(&mut self) {
199        self.velocity_samples.clear();
200        self.total_events = 0;
201        self.current_batch_size = self.config.min_size;
202        self.last_recalc = Instant::now();
203    }
204}
205
206/// Batch worker state.
207///
208/// Manages batch assembly for a single shard.
209pub struct BatchWorker {
210    /// Shard ID.
211    shard_id: u16,
212    /// Adaptive batcher.
213    batcher: AdaptiveBatcher,
214    /// Current batch being assembled.
215    current_batch: Vec<InternalEvent>,
216    /// Sequence number for the next batch.
217    next_sequence: u64,
218    /// Mirror of `next_sequence` published to the bus, so
219    /// `EventBus::remove_shard_internal` can read the worker's
220    /// final post-flush sequence after awaiting the worker's
221    /// `JoinHandle`. Used as the `sequence_start` for the
222    /// stranded-ring-buffer flush so its msg-ids don't collide
223    /// with the worker's own first batch under JetStream's dedup
224    /// window.
225    ///
226    /// Updated on every successful `flush`. The hot path pays one
227    /// release-ordered atomic store per dispatched batch — the
228    /// per-batch dispatch already crosses an `await` so the
229    /// extra store is amortized away.
230    next_sequence_published: Arc<AtomicU64>,
231    /// Producer nonce stamped on every produced `Batch`.
232    ///
233    /// When the bus is configured with `producer_nonce_path`, this
234    /// is the persisted u64 from
235    /// `adapter::PersistentProducerNonce::load_or_create`. When
236    /// not configured, it falls back to the per-process nonce
237    /// from `event::batch_process_nonce`. Adapters that key dedup
238    /// on `(producer_nonce, shard, sequence_start, i)` (today:
239    /// JetStream `Nats-Msg-Id`, Redis `dedup_id` field) use this
240    /// to recognize cross-process retries.
241    producer_nonce: u64,
242    /// Time when the current batch started.
243    batch_start: Option<Instant>,
244    /// Configuration.
245    config: BatchConfig,
246}
247
248impl BatchWorker {
249    /// Create a new batch worker.
250    ///
251    /// `next_sequence_published` is the bus-owned mirror of
252    /// `next_sequence`. Pass `Arc::new(AtomicU64::new(0))` if the
253    /// caller doesn't need to observe the post-exit sequence;
254    /// production paths share it with `bus::remove_shard_internal`.
255    ///
256    /// `producer_nonce` is stamped on every produced `Batch` for
257    /// cross-process dedup. The bus passes its loaded nonce in;
258    /// tests can use any u64 (typically 0 or the per-process
259    /// default).
260    pub fn new(
261        shard_id: u16,
262        config: BatchConfig,
263        next_sequence_published: Arc<AtomicU64>,
264        producer_nonce: u64,
265    ) -> Self {
266        let capacity = config.max_size;
267        Self {
268            shard_id,
269            batcher: AdaptiveBatcher::new(config.clone()),
270            current_batch: Vec::with_capacity(capacity),
271            next_sequence: 0,
272            next_sequence_published,
273            producer_nonce,
274            batch_start: None,
275            config,
276        }
277    }
278
279    /// Add events to the current batch.
280    ///
281    /// Returns a completed batch if thresholds are met, or None if more events are needed.
282    ///
283    /// # Empty-input side effect
284    ///
285    /// Passing an empty `events` vec is **not** a no-op. The
286    /// BatchWorker's recv-timeout arm calls `add_events(vec![])`
287    /// specifically to drive a `check_timeout` round, which may
288    /// flush the in-memory `current_batch` if `max_delay` has
289    /// elapsed since the last event arrived. Callers who want
290    /// "true no-op on empty input" must check `events.is_empty()`
291    /// themselves before calling.
292    ///
293    /// Pre-fix this side effect was not documented and
294    /// surprised callers expecting `add_events([])` to be inert.
295    /// The fix is documentation only — the BatchWorker's timeout
296    /// flush relies on this behavior, so removing the side effect
297    /// would break the timeout-flush mechanism in bus.rs.
298    pub fn add_events(&mut self, events: Vec<InternalEvent>) -> Option<Batch> {
299        if events.is_empty() {
300            return self.check_timeout();
301        }
302
303        // Start batch timer if this is the first event
304        if self.current_batch.is_empty() {
305            self.batch_start = Some(Instant::now());
306        }
307
308        // Record events and get target batch size
309        let target_size = self.batcher.record_events(events.len());
310
311        // Add events to current batch
312        self.current_batch.extend(events);
313
314        // Check if we should flush
315        if self.current_batch.len() >= target_size {
316            return Some(self.flush());
317        }
318
319        // Check timeout
320        self.check_timeout()
321    }
322
323    /// Check if the batch should be flushed due to timeout.
324    fn check_timeout(&mut self) -> Option<Batch> {
325        if self.current_batch.is_empty() {
326            return None;
327        }
328
329        if let Some(start) = self.batch_start {
330            if start.elapsed() >= self.config.max_delay {
331                return Some(self.flush());
332            }
333        }
334
335        None
336    }
337
338    /// Force flush the current batch, even if thresholds aren't met.
339    pub fn flush(&mut self) -> Batch {
340        let events = std::mem::replace(
341            &mut self.current_batch,
342            Vec::with_capacity(self.config.max_size),
343        );
344
345        let sequence_start = self.next_sequence;
346        // `saturating_add` rather than `+=`: at u64 granularity this can
347        // only happen after wraparound (~584 years at 1 B events/s), but
348        // the wrap would silently corrupt sequence numbering. Saturating
349        // pins the counter at u64::MAX so downstream consumers see a
350        // monotonic, observable terminal state instead.
351        self.next_sequence = self.next_sequence.saturating_add(events.len() as u64);
352        // Publish the post-flush counter to the bus-owned mirror.
353        // `bus::remove_shard_internal` reads this after awaiting the
354        // worker's `JoinHandle` and uses it as the
355        // `sequence_start` for the stranded-ring-buffer flush — that
356        // guarantees the stranded msg-ids fall strictly past every
357        // msg-id this worker emitted, closing the JetStream-dedup
358        // collision risk.
359        self.next_sequence_published
360            .store(self.next_sequence, Ordering::Release);
361        self.batch_start = None;
362
363        Batch::with_nonce(self.shard_id, events, sequence_start, self.producer_nonce)
364    }
365
366    /// Check if there are pending events.
367    pub fn has_pending(&self) -> bool {
368        !self.current_batch.is_empty()
369    }
370
371    /// Get the number of pending events.
372    pub fn pending_count(&self) -> usize {
373        self.current_batch.len()
374    }
375
376    /// Get the current target batch size.
377    pub fn target_batch_size(&self) -> usize {
378        self.batcher.batch_size()
379    }
380
381    /// Get time until the current batch times out.
382    pub fn time_until_timeout(&self) -> Option<Duration> {
383        self.batch_start.map(|start| {
384            let elapsed = start.elapsed();
385            self.config.max_delay.saturating_sub(elapsed)
386        })
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use serde_json::json;
394
395    fn make_events(count: usize, shard_id: u16) -> Vec<InternalEvent> {
396        (0..count)
397            .map(|i| InternalEvent::from_value(json!({"i": i}), i as u64, shard_id))
398            .collect()
399    }
400
401    /// Test helper — most tests don't observe the published sequence,
402    /// they just need the third arg.
403    fn fresh_published() -> Arc<AtomicU64> {
404        Arc::new(AtomicU64::new(0))
405    }
406
407    #[test]
408    fn test_batch_size_threshold() {
409        let config = BatchConfig {
410            min_size: 10,
411            max_size: 100,
412            max_delay: Duration::from_secs(10),
413            adaptive: false,
414            velocity_window: Duration::from_millis(100),
415        };
416
417        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
418
419        // Add 50 events - should not trigger flush (target is 100 when adaptive=false)
420        let batch = worker.add_events(make_events(50, 0));
421        assert!(batch.is_none());
422        assert_eq!(worker.pending_count(), 50);
423
424        // Add 50 more - should trigger flush
425        let batch = worker.add_events(make_events(50, 0));
426        assert!(batch.is_some());
427        let batch = batch.unwrap();
428        assert_eq!(batch.events.len(), 100);
429        assert_eq!(batch.shard_id, 0);
430    }
431
432    /// `add_events(vec![])` is **not** a no-op. The activate-failure
433    /// rollback path in `bus.rs` and the BatchWorker's recv-timeout
434    /// arm both rely on the empty-input call to drive a
435    /// `check_timeout`, which can flush `current_batch` if
436    /// `max_delay` has elapsed. A future refactor that makes
437    /// `add_events([])` a true no-op would silently lose those
438    /// already-batched events on the rollback path. Pin the
439    /// load-bearing behavior here so any such "cleanup" trips a
440    /// failing test rather than producing a silent regression.
441    #[test]
442    fn add_events_empty_can_flush_via_timeout() {
443        let config = BatchConfig {
444            min_size: 10,
445            max_size: 1000,
446            max_delay: Duration::from_millis(1),
447            adaptive: false,
448            velocity_window: Duration::from_millis(100),
449        };
450        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
451
452        // Stage some events well below `min_size` so neither size
453        // threshold can hide the timeout-flush.
454        let pre = worker.add_events(make_events(3, 0));
455        assert!(pre.is_none(), "below min_size — no flush yet");
456
457        // Empty input *before* max_delay must be a no-op (returns
458        // None). This pins the second half of the contract: the
459        // side-effect is bounded to "check timeout", not "always
460        // flush".
461        let early = worker.add_events(vec![]);
462        assert!(
463            early.is_none(),
464            "empty input before max_delay must NOT flush — \
465             check_timeout returns None when start.elapsed() < max_delay"
466        );
467
468        // Wait past max_delay and call with empty input — must flush.
469        std::thread::sleep(Duration::from_millis(5));
470        let flushed = worker.add_events(vec![]);
471        assert!(
472            flushed.is_some(),
473            "empty input after max_delay MUST flush via check_timeout — \
474             this is the contract bus.rs and BatchWorker's recv-timeout \
475             arm rely on; making it a no-op silently loses events on \
476             the activate-failure rollback path"
477        );
478        assert_eq!(flushed.unwrap().events.len(), 3);
479    }
480
481    #[test]
482    fn test_batch_timeout() {
483        let config = BatchConfig {
484            min_size: 10,
485            max_size: 1000,
486            max_delay: Duration::from_millis(1),
487            adaptive: false,
488            velocity_window: Duration::from_millis(100),
489        };
490
491        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
492
493        // Add some events
494        let batch = worker.add_events(make_events(5, 0));
495        assert!(batch.is_none());
496
497        // Wait for timeout
498        std::thread::sleep(Duration::from_millis(5));
499
500        // Check timeout triggers flush
501        let batch = worker.add_events(vec![]);
502        assert!(batch.is_some());
503        assert_eq!(batch.unwrap().events.len(), 5);
504    }
505
506    #[test]
507    fn test_adaptive_batch_sizing() {
508        let config = BatchConfig {
509            min_size: 100,
510            max_size: 10_000,
511            max_delay: Duration::from_secs(10),
512            adaptive: true,
513            velocity_window: Duration::from_millis(100),
514        };
515
516        let mut batcher = AdaptiveBatcher::new(config);
517
518        // Initially should be at min_size
519        assert_eq!(batcher.batch_size(), 100);
520
521        // Simulate high velocity (add lots of events quickly)
522        for _ in 0..100 {
523            batcher.record_events(10_000);
524            std::thread::sleep(Duration::from_micros(100));
525        }
526
527        // Batch size should have increased
528        assert!(batcher.batch_size() > 100);
529    }
530
531    /// Regression: `recalculate_batch_size` previously did
532    /// `current_batch_size * 3 + target` with bare arithmetic. A
533    /// hostile `BatchConfig` with `max_size` near `usize::MAX / 3`
534    /// could push `current_batch_size` near that threshold, where
535    /// the multiply overflows — debug build panics, release wraps
536    /// to a tiny value. The fix saturates both the multiply and
537    /// add. Pin the saturation so a future revert ("simplify" the
538    /// arithmetic) is caught by the test rather than discovered
539    /// in production via a debug-build crash.
540    #[test]
541    fn recalculate_batch_size_saturates_on_hostile_max_size() {
542        let config = BatchConfig {
543            min_size: 1,
544            max_size: usize::MAX,
545            max_delay: Duration::from_secs(10),
546            adaptive: true,
547            velocity_window: Duration::from_millis(100),
548        };
549        let mut batcher = AdaptiveBatcher::new(config);
550
551        // Drive `current_batch_size` to a value where `* 3` would
552        // overflow. The field is module-private but we're in the
553        // same module, so direct mutation is fine.
554        batcher.current_batch_size = usize::MAX - 1;
555
556        // Pre-fix this would either debug-panic (`overflow when
557        // multiplying`) or release-wrap to a small value.
558        // Post-fix: saturating_mul keeps the result at usize::MAX
559        // and the bounds clamp pulls it back into [min_size,
560        // max_size]. Either way, no panic and no wrap to tiny.
561        batcher.recalculate_batch_size();
562
563        // Sanity: the resulting size is still inside the
564        // configured window and didn't wrap to a small value.
565        assert!(
566            batcher.current_batch_size >= 1,
567            "post-recalc batch size must respect min_size, got {}",
568            batcher.current_batch_size,
569        );
570    }
571
572    #[test]
573    fn test_force_flush() {
574        let config = BatchConfig {
575            min_size: 100,
576            max_size: 1000,
577            max_delay: Duration::from_secs(10),
578            adaptive: false,
579            velocity_window: Duration::from_millis(100),
580        };
581
582        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
583
584        // Add some events (below threshold)
585        worker.add_events(make_events(50, 0));
586        assert_eq!(worker.pending_count(), 50);
587
588        // Force flush
589        let batch = worker.flush();
590        assert_eq!(batch.events.len(), 50);
591        assert!(!worker.has_pending());
592    }
593
594    #[test]
595    fn test_sequence_numbers() {
596        let config = BatchConfig::default();
597        let mut worker = BatchWorker::new(0, config.clone(), fresh_published(), 0);
598
599        // Create batches and verify sequence numbers
600        worker.add_events(make_events(100, 0));
601        let batch1 = worker.flush();
602        assert_eq!(batch1.sequence_start, 0);
603
604        worker.add_events(make_events(50, 0));
605        let batch2 = worker.flush();
606        assert_eq!(batch2.sequence_start, 100);
607
608        worker.add_events(make_events(25, 0));
609        let batch3 = worker.flush();
610        assert_eq!(batch3.sequence_start, 150);
611    }
612
613    /// Regression: every `flush` must publish the
614    /// post-flush `next_sequence` to the shared atomic so
615    /// `bus::remove_shard_internal` can read it after awaiting the
616    /// worker and use it as the stranded-flush `sequence_start`.
617    /// Pre-fix the stranded batch hardcoded 0, colliding with the
618    /// worker's first batch under JetStream's dedup window.
619    #[test]
620    fn flush_publishes_post_flush_next_sequence_to_shared_atomic() {
621        let config = BatchConfig::default();
622        let published = Arc::new(AtomicU64::new(0));
623        let mut worker = BatchWorker::new(0, config, published.clone(), 0);
624
625        // Pre-flush: atomic is at its initial value.
626        assert_eq!(published.load(Ordering::Acquire), 0);
627
628        worker.add_events(make_events(50, 0));
629        let _ = worker.flush();
630
631        assert_eq!(
632            published.load(Ordering::Acquire),
633            50,
634            "post-flush atomic must mirror BatchWorker::next_sequence",
635        );
636    }
637
638    /// Consecutive flushes keep the published atomic in lock-step
639    /// with the internal counter — pin the addition (not just the
640    /// initial set) so a future refactor that updates only on
641    /// alternate flushes (or only when `events.is_empty()`) gets
642    /// caught.
643    #[test]
644    fn flush_publishes_advance_consecutive_flushes() {
645        let config = BatchConfig::default();
646        let published = Arc::new(AtomicU64::new(0));
647        let mut worker = BatchWorker::new(0, config, published.clone(), 0);
648
649        worker.add_events(make_events(10, 0));
650        let _ = worker.flush();
651        assert_eq!(published.load(Ordering::Acquire), 10);
652
653        worker.add_events(make_events(7, 0));
654        let _ = worker.flush();
655        assert_eq!(published.load(Ordering::Acquire), 17);
656
657        worker.add_events(make_events(33, 0));
658        let _ = worker.flush();
659        assert_eq!(published.load(Ordering::Acquire), 50);
660    }
661
662    /// Mirror the saturating-add overflow behavior on the published
663    /// atomic. `bus::remove_shard_internal` uses this value as a
664    /// `sequence_start`; if it ever overflowed back to 0 the
665    /// stranded batch's msg-ids would collide with the worker's
666    /// first batch — the exact JetStream-dedup hazard the
667    /// stranded-flush path is designed to avoid.
668    #[test]
669    fn flush_publishes_saturating_max_on_overflow() {
670        let config = BatchConfig::default();
671        let published = Arc::new(AtomicU64::new(0));
672        let mut worker = BatchWorker::new(0, config, published.clone(), 0);
673
674        worker.next_sequence = u64::MAX - 3;
675        worker.add_events(make_events(10, 0));
676        let _ = worker.flush();
677
678        assert_eq!(worker.next_sequence, u64::MAX);
679        assert_eq!(
680            published.load(Ordering::Acquire),
681            u64::MAX,
682            "published atomic must saturate at u64::MAX, not wrap to 6",
683        );
684    }
685
686    /// Regression: BUG_REPORT.md #19 — `next_sequence` previously used
687    /// unchecked `+=`, which would silently wrap on overflow. Saturating
688    /// pins it at `u64::MAX` so downstream consumers see a stable
689    /// terminal state instead of restarting at 0.
690    #[test]
691    fn test_sequence_saturates_on_overflow() {
692        let config = BatchConfig::default();
693        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
694
695        // Force the counter near overflow.
696        worker.next_sequence = u64::MAX - 3;
697
698        worker.add_events(make_events(10, 0));
699        let batch = worker.flush();
700
701        assert_eq!(batch.sequence_start, u64::MAX - 3);
702        // Without saturation this would wrap to 6 and the next batch
703        // would restart sequencing from there.
704        assert_eq!(worker.next_sequence, u64::MAX);
705    }
706}