Skip to main content

net/shard/
batch.rs

1//! Batch aggregation with adaptive sizing.
2//!
3//! The batch worker continuously drains events from a shard's ring buffer,
4//! assembles them into batches, and dispatches them to the adapter.
5//!
6//! # Adaptive Sizing
7//!
8//! Batch size is dynamically adjusted based on ingestion velocity:
9//! - High velocity → larger batches → fewer adapter calls → higher throughput
10//! - Low velocity → smaller batches → lower latency → faster flush
11
12use std::collections::VecDeque;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use crate::config::BatchConfig;
18use crate::event::{Batch, InternalEvent};
19
20/// Cap on `AdaptiveBatcher::velocity_samples` to bound per-shard
21/// memory use under high throughput. `calculate_velocity` reads
22/// only `front()` and `back()`, so additional samples in between
23/// are pure overhead. 1 024 entries × ~24 bytes per tuple
24/// ≈ 24 KiB per shard — well below the 240 KiB pre-fix worst
25/// case at 100 k events/s × 100 ms `velocity_window`.
26const VELOCITY_SAMPLES_CAP: usize = 1024;
27
28/// Adaptive batch size calculator.
29///
30/// Tracks recent ingestion velocity and adjusts batch size accordingly.
31pub struct AdaptiveBatcher {
32    /// Configuration.
33    config: BatchConfig,
34    /// Current target batch size.
35    current_batch_size: usize,
36    /// Velocity samples: (timestamp, cumulative_count).
37    velocity_samples: VecDeque<(Instant, u64)>,
38    /// Total events seen (for velocity calculation).
39    total_events: u64,
40    /// Last recalculation time.
41    last_recalc: Instant,
42}
43
44impl AdaptiveBatcher {
45    /// Create a new adaptive batcher.
46    pub fn new(config: BatchConfig) -> Self {
47        Self {
48            current_batch_size: config.min_size,
49            velocity_samples: VecDeque::with_capacity(100),
50            total_events: 0,
51            last_recalc: Instant::now(),
52            config,
53        }
54    }
55
56    /// Record events and get the current target batch size.
57    ///
58    /// Call this each time events are drained from the ring buffer.
59    #[inline]
60    pub fn record_events(&mut self, count: usize) -> usize {
61        // Saturating-add: a stream that's ingested ~2^64 events
62        // is already in trouble, but a wrap from `u64::MAX` to a
63        // small value would interact with the
64        // `saturating_sub(oldest_count)` in `calculate_velocity`
65        // — the saturating-sub would underflow to 0 across the
66        // wraparound boundary and `velocity` would collapse to 0,
67        // forcing the batcher to its `min_size` floor right when
68        // sustained high throughput is exactly what the adaptive
69        // path was meant to handle. Saturating instead clamps at
70        // `u64::MAX` and `newest - oldest = 0` is the documented
71        // stop state.
72        self.total_events = self.total_events.saturating_add(count as u64);
73
74        if !self.config.adaptive {
75            return self.config.max_size;
76        }
77
78        let now = Instant::now();
79
80        // Add sample
81        self.velocity_samples.push_back((now, self.total_events));
82
83        // Remove old samples outside the time window.
84        //
85        // `Instant - Duration` panics on underflow, and on Windows
86        // `Instant` is QPC-relative to boot — a process that
87        // starts within `velocity_window` (typically a few
88        // seconds) of boot would abort the batch worker task
89        // here. `checked_sub` returns `None` on underflow; in
90        // that case skip the time-based eviction (every existing
91        // sample is "newer than the window floor" by definition,
92        // since the floor predates `Instant::now()`'s zero point).
93        // The sample-count cap below still bounds memory.
94        if let Some(window_start) = now.checked_sub(self.config.velocity_window) {
95            while let Some(&(ts, _)) = self.velocity_samples.front() {
96                if ts < window_start {
97                    self.velocity_samples.pop_front();
98                } else {
99                    break;
100                }
101            }
102        }
103
104        // Also cap the deque by sample COUNT. Pre-fix the
105        // bound was time-only, so at 100k events/s with a 100 ms
106        // velocity_window the deque could grow to ~10 000 entries
107        // before time-eviction caught up, costing ~240 KiB per
108        // shard for samples never used (calculate_velocity reads
109        // only `front()` and `back()`). Cap at
110        // VELOCITY_SAMPLES_CAP so the memory footprint is bounded
111        // regardless of throughput.
112        while self.velocity_samples.len() > VELOCITY_SAMPLES_CAP {
113            self.velocity_samples.pop_front();
114        }
115
116        // Recalculate batch size periodically (not on every call)
117        if now.duration_since(self.last_recalc) > Duration::from_millis(10) {
118            self.recalculate_batch_size();
119            self.last_recalc = now;
120        }
121
122        self.current_batch_size
123    }
124
125    /// Get the current target batch size.
126    #[inline]
127    pub fn batch_size(&self) -> usize {
128        self.current_batch_size
129    }
130
131    /// Calculate events per second based on recent samples.
132    #[expect(
133        clippy::unwrap_used,
134        reason = "len >= 2 guard above; .front() / .back() on a non-empty VecDeque is infallible"
135    )]
136    fn calculate_velocity(&self) -> f64 {
137        if self.velocity_samples.len() < 2 {
138            return 0.0;
139        }
140
141        let (oldest_ts, oldest_count) = *self.velocity_samples.front().unwrap();
142        let (newest_ts, newest_count) = *self.velocity_samples.back().unwrap();
143
144        let elapsed = newest_ts.duration_since(oldest_ts);
145        if elapsed.is_zero() {
146            return 0.0;
147        }
148
149        let events = newest_count.saturating_sub(oldest_count);
150        events as f64 / elapsed.as_secs_f64()
151    }
152
153    /// Recalculate the optimal batch size based on recent velocity.
154    fn recalculate_batch_size(&mut self) {
155        let velocity = self.calculate_velocity();
156
157        // Scale batch size with velocity
158        // At 1M events/sec → batch size ~5,000
159        // At 10M events/sec → batch size ~50,000 (capped at max)
160        //
161        // Explicit `clamp(0.0, usize::MAX as f64)` before the `as
162        // usize` cast: Rust's `as` cast on f64 → usize is
163        // saturating in current versions, but the explicit clamp
164        // documents intent and survives any future edition that
165        // tightens the cast (e.g. requires `try_from` on
166        // overflow). The `velocity > 0.0` guard above already
167        // rules out NaN and negative; the upper bound here only
168        // matters for the unreachable `velocity > usize::MAX *
169        // 200.0` case (~3.7e21 events/sec), but the saturation
170        // is cheaper than reasoning about future cast semantics.
171        let target = if velocity > 0.0 {
172            let scaled = (velocity / 200.0).clamp(0.0, usize::MAX as f64);
173            (scaled as usize).clamp(self.config.min_size, self.config.max_size)
174        } else {
175            self.config.min_size
176        };
177
178        // Smooth transitions using exponential moving average
179        // new = (old * 3 + target) / 4
180        //
181        // Saturating: `BatchConfig::validate` doesn't bound
182        // `max_size` from above, so a hostile config that pushes
183        // `current_batch_size` near `usize::MAX / 3` would
184        // overflow the multiply (debug: panic; release: wrap to a
185        // tiny value, collapsing the batcher to its `min_size`
186        // floor on the next clamp). Saturating preserves the
187        // intent — clamp at `usize::MAX` and let the bounds
188        // clamp below pull it back into the configured window.
189        self.current_batch_size = self
190            .current_batch_size
191            .saturating_mul(3)
192            .saturating_add(target)
193            / 4;
194
195        // Ensure we stay within bounds
196        self.current_batch_size = self
197            .current_batch_size
198            .clamp(self.config.min_size, self.config.max_size);
199    }
200
201    /// Reset the batcher state.
202    pub fn reset(&mut self) {
203        self.velocity_samples.clear();
204        self.total_events = 0;
205        self.current_batch_size = self.config.min_size;
206        self.last_recalc = Instant::now();
207    }
208}
209
210/// Batch worker state.
211///
212/// Manages batch assembly for a single shard.
213pub struct BatchWorker {
214    /// Shard ID.
215    shard_id: u16,
216    /// Adaptive batcher.
217    batcher: AdaptiveBatcher,
218    /// Current batch being assembled.
219    current_batch: Vec<InternalEvent>,
220    /// Sequence number for the next batch.
221    next_sequence: u64,
222    /// Mirror of `next_sequence` published to the bus, so
223    /// `EventBus::remove_shard_internal` can read the worker's
224    /// final post-flush sequence after awaiting the worker's
225    /// `JoinHandle`. Used as the `sequence_start` for the
226    /// stranded-ring-buffer flush so its msg-ids don't collide
227    /// with the worker's own first batch under JetStream's dedup
228    /// window.
229    ///
230    /// Updated on every successful `flush`. The hot path pays one
231    /// release-ordered atomic store per dispatched batch — the
232    /// per-batch dispatch already crosses an `await` so the
233    /// extra store is amortized away.
234    next_sequence_published: Arc<AtomicU64>,
235    /// Producer nonce stamped on every produced `Batch`.
236    ///
237    /// When the bus is configured with `producer_nonce_path`, this
238    /// is the persisted u64 from
239    /// `adapter::PersistentProducerNonce::load_or_create`. When
240    /// not configured, it falls back to the per-process nonce
241    /// from `event::batch_process_nonce`. Adapters that key dedup
242    /// on `(producer_nonce, shard, sequence_start, i)` (today:
243    /// JetStream `Nats-Msg-Id`, Redis `dedup_id` field) use this
244    /// to recognize cross-process retries.
245    producer_nonce: u64,
246    /// Time when the current batch started.
247    batch_start: Option<Instant>,
248    /// Configuration.
249    config: BatchConfig,
250}
251
252impl BatchWorker {
253    /// Create a new batch worker.
254    ///
255    /// `next_sequence_published` is the bus-owned mirror of
256    /// `next_sequence`. Pass `Arc::new(AtomicU64::new(0))` if the
257    /// caller doesn't need to observe the post-exit sequence;
258    /// production paths share it with `bus::remove_shard_internal`.
259    ///
260    /// `producer_nonce` is stamped on every produced `Batch` for
261    /// cross-process dedup. The bus passes its loaded nonce in;
262    /// tests can use any u64 (typically 0 or the per-process
263    /// default).
264    pub fn new(
265        shard_id: u16,
266        config: BatchConfig,
267        next_sequence_published: Arc<AtomicU64>,
268        producer_nonce: u64,
269    ) -> Self {
270        let capacity = config.max_size;
271        Self {
272            shard_id,
273            batcher: AdaptiveBatcher::new(config.clone()),
274            current_batch: Vec::with_capacity(capacity),
275            next_sequence: 0,
276            next_sequence_published,
277            producer_nonce,
278            batch_start: None,
279            config,
280        }
281    }
282
283    /// Add events to the current batch.
284    ///
285    /// Returns a completed batch if thresholds are met, or None if more events are needed.
286    ///
287    /// # Empty-input side effect
288    ///
289    /// Passing an empty `events` vec is **not** a no-op. The
290    /// BatchWorker's recv-timeout arm calls `add_events(vec![])`
291    /// specifically to drive a `check_timeout` round, which may
292    /// flush the in-memory `current_batch` if `max_delay` has
293    /// elapsed since the last event arrived. Callers who want
294    /// "true no-op on empty input" must check `events.is_empty()`
295    /// themselves before calling.
296    ///
297    /// Pre-fix this side effect was not documented and
298    /// surprised callers expecting `add_events([])` to be inert.
299    /// The fix is documentation only — the BatchWorker's timeout
300    /// flush relies on this behavior, so removing the side effect
301    /// would break the timeout-flush mechanism in bus.rs.
302    pub fn add_events(&mut self, events: Vec<InternalEvent>) -> Option<Batch> {
303        if events.is_empty() {
304            return self.check_timeout();
305        }
306
307        // Start batch timer if this is the first event
308        if self.current_batch.is_empty() {
309            self.batch_start = Some(Instant::now());
310        }
311
312        // Record events and get target batch size
313        let target_size = self.batcher.record_events(events.len());
314
315        // Add events to current batch
316        self.current_batch.extend(events);
317
318        // Check if we should flush
319        if self.current_batch.len() >= target_size {
320            return Some(self.flush());
321        }
322
323        // Check timeout
324        self.check_timeout()
325    }
326
327    /// Check if the batch should be flushed due to timeout.
328    ///
329    /// Pre-fix perf #38 in `docs/performance/net-perf-analysis.md`
330    /// this was private and the bus's timeout branch called
331    /// `add_events(vec![])` as the indirection — an empty `Vec`
332    /// allocation per timeout tick purely as a sentinel. The
333    /// method is now `pub` so callers can express the intent
334    /// directly without the alloc and without the documented
335    /// "empty `add_events` has a side effect" footgun.
336    pub fn check_timeout(&mut self) -> Option<Batch> {
337        if self.current_batch.is_empty() {
338            return None;
339        }
340
341        if let Some(start) = self.batch_start {
342            if start.elapsed() >= self.config.max_delay {
343                return Some(self.flush());
344            }
345        }
346
347        None
348    }
349
350    /// Force flush the current batch, even if thresholds aren't met.
351    pub fn flush(&mut self) -> Batch {
352        let events = std::mem::replace(
353            &mut self.current_batch,
354            Vec::with_capacity(self.config.max_size),
355        );
356
357        let sequence_start = self.next_sequence;
358        // `saturating_add` rather than `+=`: at u64 granularity this can
359        // only happen after wraparound (~584 years at 1 B events/s), but
360        // the wrap would silently corrupt sequence numbering. Saturating
361        // pins the counter at u64::MAX so downstream consumers see a
362        // monotonic, observable terminal state instead.
363        self.next_sequence = self.next_sequence.saturating_add(events.len() as u64);
364        // Publish the post-flush counter to the bus-owned mirror.
365        // `bus::remove_shard_internal` reads this after awaiting the
366        // worker's `JoinHandle` and uses it as the
367        // `sequence_start` for the stranded-ring-buffer flush — that
368        // guarantees the stranded msg-ids fall strictly past every
369        // msg-id this worker emitted, closing the JetStream-dedup
370        // collision risk.
371        self.next_sequence_published
372            .store(self.next_sequence, Ordering::Release);
373        self.batch_start = None;
374
375        Batch::with_nonce(self.shard_id, events, sequence_start, self.producer_nonce)
376    }
377
378    /// Check if there are pending events.
379    pub fn has_pending(&self) -> bool {
380        !self.current_batch.is_empty()
381    }
382
383    /// Get the number of pending events.
384    pub fn pending_count(&self) -> usize {
385        self.current_batch.len()
386    }
387
388    /// Get the current target batch size.
389    pub fn target_batch_size(&self) -> usize {
390        self.batcher.batch_size()
391    }
392
393    /// Get time until the current batch times out.
394    pub fn time_until_timeout(&self) -> Option<Duration> {
395        self.batch_start.map(|start| {
396            let elapsed = start.elapsed();
397            self.config.max_delay.saturating_sub(elapsed)
398        })
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use serde_json::json;
406
407    fn make_events(count: usize, shard_id: u16) -> Vec<InternalEvent> {
408        (0..count)
409            .map(|i| InternalEvent::from_value(json!({"i": i}), i as u64, shard_id))
410            .collect()
411    }
412
413    /// Test helper — most tests don't observe the published sequence,
414    /// they just need the third arg.
415    fn fresh_published() -> Arc<AtomicU64> {
416        Arc::new(AtomicU64::new(0))
417    }
418
419    #[test]
420    fn test_batch_size_threshold() {
421        let config = BatchConfig {
422            min_size: 10,
423            max_size: 100,
424            max_delay: Duration::from_secs(10),
425            adaptive: false,
426            velocity_window: Duration::from_millis(100),
427        };
428
429        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
430
431        // Add 50 events - should not trigger flush (target is 100 when adaptive=false)
432        let batch = worker.add_events(make_events(50, 0));
433        assert!(batch.is_none());
434        assert_eq!(worker.pending_count(), 50);
435
436        // Add 50 more - should trigger flush
437        let batch = worker.add_events(make_events(50, 0));
438        assert!(batch.is_some());
439        let batch = batch.unwrap();
440        assert_eq!(batch.events.len(), 100);
441        assert_eq!(batch.shard_id, 0);
442    }
443
444    /// `add_events(vec![])` is **not** a no-op. The activate-failure
445    /// rollback path in `bus.rs` and the BatchWorker's recv-timeout
446    /// arm both rely on the empty-input call to drive a
447    /// `check_timeout`, which can flush `current_batch` if
448    /// `max_delay` has elapsed. A future refactor that makes
449    /// `add_events([])` a true no-op would silently lose those
450    /// already-batched events on the rollback path. Pin the
451    /// load-bearing behavior here so any such "cleanup" trips a
452    /// failing test rather than producing a silent regression.
453    #[test]
454    fn add_events_empty_can_flush_via_timeout() {
455        let config = BatchConfig {
456            min_size: 10,
457            max_size: 1000,
458            max_delay: Duration::from_millis(1),
459            adaptive: false,
460            velocity_window: Duration::from_millis(100),
461        };
462        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
463
464        // Stage some events well below `min_size` so neither size
465        // threshold can hide the timeout-flush.
466        let pre = worker.add_events(make_events(3, 0));
467        assert!(pre.is_none(), "below min_size — no flush yet");
468
469        // Empty input *before* max_delay must be a no-op (returns
470        // None). This pins the second half of the contract: the
471        // side-effect is bounded to "check timeout", not "always
472        // flush".
473        let early = worker.add_events(vec![]);
474        assert!(
475            early.is_none(),
476            "empty input before max_delay must NOT flush — \
477             check_timeout returns None when start.elapsed() < max_delay"
478        );
479
480        // Wait past max_delay and call with empty input — must flush.
481        std::thread::sleep(Duration::from_millis(5));
482        let flushed = worker.add_events(vec![]);
483        assert!(
484            flushed.is_some(),
485            "empty input after max_delay MUST flush via check_timeout — \
486             this is the contract bus.rs and BatchWorker's recv-timeout \
487             arm rely on; making it a no-op silently loses events on \
488             the activate-failure rollback path"
489        );
490        assert_eq!(flushed.unwrap().events.len(), 3);
491    }
492
493    /// Pin perf #38: the direct `check_timeout()` path produces
494    /// the same outcomes as the legacy `add_events(vec![])` shape.
495    /// `bus.rs`'s recv-timeout arm now calls `check_timeout`
496    /// directly to avoid the empty-`Vec` allocation; the public
497    /// contract on both shapes must agree so a future refactor
498    /// can't drift them apart.
499    #[test]
500    fn check_timeout_matches_add_events_empty_signal() {
501        let config = BatchConfig {
502            min_size: 10,
503            max_size: 1000,
504            max_delay: Duration::from_millis(1),
505            adaptive: false,
506            velocity_window: Duration::from_millis(100),
507        };
508
509        // Empty-batch case: both shapes return None.
510        let mut w = BatchWorker::new(0, config.clone(), fresh_published(), 0);
511        assert!(w.check_timeout().is_none(), "no pending → None");
512
513        // Pending but pre-deadline: both shapes return None.
514        let _ = w.add_events(make_events(3, 0));
515        assert!(
516            w.check_timeout().is_none(),
517            "pending below threshold + pre-deadline → None",
518        );
519
520        // Pending past deadline: both shapes flush.
521        std::thread::sleep(Duration::from_millis(5));
522        let flushed = w.check_timeout();
523        assert!(
524            flushed.is_some(),
525            "pending past max_delay → check_timeout must flush",
526        );
527        assert_eq!(flushed.unwrap().events.len(), 3);
528    }
529
530    #[test]
531    fn test_batch_timeout() {
532        let config = BatchConfig {
533            min_size: 10,
534            max_size: 1000,
535            max_delay: Duration::from_millis(1),
536            adaptive: false,
537            velocity_window: Duration::from_millis(100),
538        };
539
540        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
541
542        // Add some events
543        let batch = worker.add_events(make_events(5, 0));
544        assert!(batch.is_none());
545
546        // Wait for timeout
547        std::thread::sleep(Duration::from_millis(5));
548
549        // Check timeout triggers flush
550        let batch = worker.add_events(vec![]);
551        assert!(batch.is_some());
552        assert_eq!(batch.unwrap().events.len(), 5);
553    }
554
555    #[test]
556    fn test_adaptive_batch_sizing() {
557        let config = BatchConfig {
558            min_size: 100,
559            max_size: 10_000,
560            max_delay: Duration::from_secs(10),
561            adaptive: true,
562            velocity_window: Duration::from_millis(100),
563        };
564
565        let mut batcher = AdaptiveBatcher::new(config);
566
567        // Initially should be at min_size
568        assert_eq!(batcher.batch_size(), 100);
569
570        // Simulate high velocity (add lots of events quickly)
571        for _ in 0..100 {
572            batcher.record_events(10_000);
573            std::thread::sleep(Duration::from_micros(100));
574        }
575
576        // Batch size should have increased
577        assert!(batcher.batch_size() > 100);
578    }
579
580    /// Regression: `recalculate_batch_size` previously did
581    /// `current_batch_size * 3 + target` with bare arithmetic. A
582    /// hostile `BatchConfig` with `max_size` near `usize::MAX / 3`
583    /// could push `current_batch_size` near that threshold, where
584    /// the multiply overflows — debug build panics, release wraps
585    /// to a tiny value. The fix saturates both the multiply and
586    /// add. Pin the saturation so a future revert ("simplify" the
587    /// arithmetic) is caught by the test rather than discovered
588    /// in production via a debug-build crash.
589    #[test]
590    fn recalculate_batch_size_saturates_on_hostile_max_size() {
591        let config = BatchConfig {
592            min_size: 1,
593            max_size: usize::MAX,
594            max_delay: Duration::from_secs(10),
595            adaptive: true,
596            velocity_window: Duration::from_millis(100),
597        };
598        let mut batcher = AdaptiveBatcher::new(config);
599
600        // Drive `current_batch_size` to a value where `* 3` would
601        // overflow. The field is module-private but we're in the
602        // same module, so direct mutation is fine.
603        batcher.current_batch_size = usize::MAX - 1;
604
605        // Pre-fix this would either debug-panic (`overflow when
606        // multiplying`) or release-wrap to a small value.
607        // Post-fix: saturating_mul keeps the result at usize::MAX
608        // and the bounds clamp pulls it back into [min_size,
609        // max_size]. Either way, no panic and no wrap to tiny.
610        batcher.recalculate_batch_size();
611
612        // Sanity: the resulting size is still inside the
613        // configured window and didn't wrap to a small value.
614        assert!(
615            batcher.current_batch_size >= 1,
616            "post-recalc batch size must respect min_size, got {}",
617            batcher.current_batch_size,
618        );
619    }
620
621    #[test]
622    fn test_force_flush() {
623        let config = BatchConfig {
624            min_size: 100,
625            max_size: 1000,
626            max_delay: Duration::from_secs(10),
627            adaptive: false,
628            velocity_window: Duration::from_millis(100),
629        };
630
631        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
632
633        // Add some events (below threshold)
634        worker.add_events(make_events(50, 0));
635        assert_eq!(worker.pending_count(), 50);
636
637        // Force flush
638        let batch = worker.flush();
639        assert_eq!(batch.events.len(), 50);
640        assert!(!worker.has_pending());
641    }
642
643    #[test]
644    fn test_sequence_numbers() {
645        let config = BatchConfig::default();
646        let mut worker = BatchWorker::new(0, config.clone(), fresh_published(), 0);
647
648        // Create batches and verify sequence numbers
649        worker.add_events(make_events(100, 0));
650        let batch1 = worker.flush();
651        assert_eq!(batch1.sequence_start, 0);
652
653        worker.add_events(make_events(50, 0));
654        let batch2 = worker.flush();
655        assert_eq!(batch2.sequence_start, 100);
656
657        worker.add_events(make_events(25, 0));
658        let batch3 = worker.flush();
659        assert_eq!(batch3.sequence_start, 150);
660    }
661
662    /// Regression: every `flush` must publish the
663    /// post-flush `next_sequence` to the shared atomic so
664    /// `bus::remove_shard_internal` can read it after awaiting the
665    /// worker and use it as the stranded-flush `sequence_start`.
666    /// Pre-fix the stranded batch hardcoded 0, colliding with the
667    /// worker's first batch under JetStream's dedup window.
668    #[test]
669    fn flush_publishes_post_flush_next_sequence_to_shared_atomic() {
670        let config = BatchConfig::default();
671        let published = Arc::new(AtomicU64::new(0));
672        let mut worker = BatchWorker::new(0, config, published.clone(), 0);
673
674        // Pre-flush: atomic is at its initial value.
675        assert_eq!(published.load(Ordering::Acquire), 0);
676
677        worker.add_events(make_events(50, 0));
678        let _ = worker.flush();
679
680        assert_eq!(
681            published.load(Ordering::Acquire),
682            50,
683            "post-flush atomic must mirror BatchWorker::next_sequence",
684        );
685    }
686
687    /// Consecutive flushes keep the published atomic in lock-step
688    /// with the internal counter — pin the addition (not just the
689    /// initial set) so a future refactor that updates only on
690    /// alternate flushes (or only when `events.is_empty()`) gets
691    /// caught.
692    #[test]
693    fn flush_publishes_advance_consecutive_flushes() {
694        let config = BatchConfig::default();
695        let published = Arc::new(AtomicU64::new(0));
696        let mut worker = BatchWorker::new(0, config, published.clone(), 0);
697
698        worker.add_events(make_events(10, 0));
699        let _ = worker.flush();
700        assert_eq!(published.load(Ordering::Acquire), 10);
701
702        worker.add_events(make_events(7, 0));
703        let _ = worker.flush();
704        assert_eq!(published.load(Ordering::Acquire), 17);
705
706        worker.add_events(make_events(33, 0));
707        let _ = worker.flush();
708        assert_eq!(published.load(Ordering::Acquire), 50);
709    }
710
711    /// Mirror the saturating-add overflow behavior on the published
712    /// atomic. `bus::remove_shard_internal` uses this value as a
713    /// `sequence_start`; if it ever overflowed back to 0 the
714    /// stranded batch's msg-ids would collide with the worker's
715    /// first batch — the exact JetStream-dedup hazard the
716    /// stranded-flush path is designed to avoid.
717    #[test]
718    fn flush_publishes_saturating_max_on_overflow() {
719        let config = BatchConfig::default();
720        let published = Arc::new(AtomicU64::new(0));
721        let mut worker = BatchWorker::new(0, config, published.clone(), 0);
722
723        worker.next_sequence = u64::MAX - 3;
724        worker.add_events(make_events(10, 0));
725        let _ = worker.flush();
726
727        assert_eq!(worker.next_sequence, u64::MAX);
728        assert_eq!(
729            published.load(Ordering::Acquire),
730            u64::MAX,
731            "published atomic must saturate at u64::MAX, not wrap to 6",
732        );
733    }
734
735    /// Regression: BUG_REPORT.md #19 — `next_sequence` previously used
736    /// unchecked `+=`, which would silently wrap on overflow. Saturating
737    /// pins it at `u64::MAX` so downstream consumers see a stable
738    /// terminal state instead of restarting at 0.
739    #[test]
740    fn test_sequence_saturates_on_overflow() {
741        let config = BatchConfig::default();
742        let mut worker = BatchWorker::new(0, config, fresh_published(), 0);
743
744        // Force the counter near overflow.
745        worker.next_sequence = u64::MAX - 3;
746
747        worker.add_events(make_events(10, 0));
748        let batch = worker.flush();
749
750        assert_eq!(batch.sequence_start, u64::MAX - 3);
751        // Without saturation this would wrap to 6 and the next batch
752        // would restart sequencing from there.
753        assert_eq!(worker.next_sequence, u64::MAX);
754    }
755}