Skip to main content

hyperi_rustlib/worker/
accumulator.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/accumulator.rs
3// Purpose:   Bounded batch accumulator with time/count/bytes drain thresholds
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Bounded batch accumulator for DFE pipeline batching.
10//!
11//! Accumulates items from multiple producers (HTTP handlers, gRPC handlers, etc.)
12//! and drains them as batches when any threshold is met:
13//! - Item count reaches `max_items`
14//! - Byte count reaches `max_bytes`
15//! - Time since last drain reaches `max_wait`
16//!
17//! Bounded -- pushers get an error when the channel is full (backpressure).
18//! Shutdown-safe -- `drain_remaining()` flushes buffered items.
19//!
20//! ## Example
21//!
22//! ```rust,ignore
23//! use hyperi_rustlib::worker::BatchAccumulator;
24//! use std::time::Duration;
25//!
26//! let (acc, mut drainer) = BatchAccumulator::new(
27//!     1000,                        // channel capacity (backpressure bound)
28//!     100,                         // max items per batch
29//!     1024 * 1024,                 // max bytes per batch (1MB)
30//!     Duration::from_millis(10),   // max wait before flush
31//! );
32//!
33//! // Producers push (from HTTP handlers, etc.)
34//! acc.push(payload, payload.len()).await?;
35//!
36//! // Consumer drains batches (background task)
37//! loop {
38//!     let batch = drainer.next_batch().await;
39//!     if batch.is_empty() { break; } // shutdown
40//!     process_batch(&batch);
41//! }
42//! ```
43
44use std::time::Duration;
45
46use tokio::sync::mpsc;
47
48/// Accumulator configuration.
49#[derive(Debug, Clone)]
50pub struct AccumulatorConfig {
51    /// Channel capacity (bounded -- pushers get error when full).
52    pub channel_capacity: usize,
53    /// Maximum items per batch before auto-drain.
54    pub max_items: usize,
55    /// Maximum accumulated bytes per batch before auto-drain.
56    pub max_bytes: usize,
57    /// Maximum time since last drain before auto-flush.
58    pub max_wait: Duration,
59}
60
61impl Default for AccumulatorConfig {
62    fn default() -> Self {
63        Self {
64            channel_capacity: 10_000,
65            max_items: 100,
66            max_bytes: 1024 * 1024, // 1MB
67            max_wait: Duration::from_millis(10),
68        }
69    }
70}
71
72/// Push handle -- cloneable, used by producers to send items into the accumulator.
73#[derive(Clone)]
74pub struct BatchAccumulator<T> {
75    tx: mpsc::Sender<(T, usize)>, // (item, byte_size)
76}
77
78/// Drain handle -- used by a single consumer to receive batches.
79pub struct BatchDrainer<T> {
80    rx: mpsc::Receiver<(T, usize)>,
81    config: AccumulatorConfig,
82    buffer: Vec<T>,
83    buffer_bytes: usize,
84}
85
86/// Error when the accumulator channel is full (backpressure).
87#[derive(Debug, thiserror::Error)]
88#[error("accumulator full -- backpressure active ({capacity} items buffered)")]
89pub struct AccumulatorFull {
90    pub capacity: usize,
91}
92
93impl<T: Send + 'static> BatchAccumulator<T> {
94    /// Create a new accumulator + drainer pair.
95    ///
96    /// Returns `(push_handle, drain_handle)`. The push handle is `Clone` for
97    /// sharing across HTTP/gRPC handlers. The drain handle is used by a single
98    /// background task to receive batches.
99    #[must_use]
100    pub fn new(config: AccumulatorConfig) -> (Self, BatchDrainer<T>) {
101        let (tx, rx) = mpsc::channel(config.channel_capacity);
102        let drainer = BatchDrainer {
103            rx,
104            buffer: Vec::with_capacity(config.max_items),
105            buffer_bytes: 0,
106            config: config.clone(),
107        };
108        (Self { tx }, drainer)
109    }
110
111    /// Push an item into the accumulator.
112    ///
113    /// `byte_size` is used for the bytes threshold. Pass `payload.len()`.
114    ///
115    /// # Errors
116    ///
117    /// Returns `AccumulatorFull` if the channel is at capacity (backpressure).
118    pub async fn push(&self, item: T, byte_size: usize) -> Result<(), AccumulatorFull> {
119        self.tx
120            .try_send((item, byte_size))
121            .map_err(|_| AccumulatorFull {
122                // `max_capacity()` is the configured channel size. `capacity()`
123                // is *remaining* permits (~0 here, since we only build this
124                // error when the channel is full), which rendered a misleading
125                // "(0 items buffered)" diagnostic.
126                capacity: self.tx.max_capacity(),
127            })
128    }
129
130    /// Check if the accumulator has been closed (drainer dropped).
131    #[must_use]
132    pub fn is_closed(&self) -> bool {
133        self.tx.is_closed()
134    }
135}
136
137/// Drain accumulated [`Record`]s into a [`WorkBatch`] for push-ingest sources.
138///
139/// Push-ingest transports (HTTP, gRPC) accumulate [`Record`]s via a
140/// `BatchAccumulator<Record>`, then bridge the drained block to the engine's
141/// canonical [`WorkBatch`] currency. The `commit_tokens` are supplied by the
142/// caller because the push source owns the ack (an HTTP responder, a gRPC
143/// stream slot) -- the accumulator carries only the payload records.
144///
145/// [`Record`]: crate::transport::Record
146/// [`WorkBatch`]: crate::transport::WorkBatch
147#[cfg(feature = "transport")]
148#[must_use]
149pub fn records_into_work_batch<T: crate::transport::CommitToken>(
150    records: Vec<crate::transport::Record>,
151    commit_tokens: Vec<T>,
152) -> crate::transport::WorkBatch<T> {
153    crate::transport::WorkBatch::new(records, commit_tokens)
154}
155
156impl<T> BatchDrainer<T> {
157    /// Wait for the next batch.
158    ///
159    /// Blocks until any threshold is met (items, bytes, or time). Returns
160    /// an empty vec when the channel is closed (all producers dropped = shutdown).
161    pub async fn next_batch(&mut self) -> Vec<T> {
162        // If buffer already meets a threshold, drain immediately
163        if self.threshold_met() {
164            return self.take_buffer();
165        }
166
167        // Wait for items against a FIXED deadline. The deadline is set once per
168        // accumulation window (not recreated per arriving item), so trickle
169        // traffic -- items arriving every < max_wait -- cannot defer the flush
170        // indefinitely. The prior code created a fresh `sleep(max_wait)` on every
171        // loop iteration, so each arrival reset the timer and the first buffered
172        // item's latency was unbounded, breaking the documented "time since last
173        // drain reaches max_wait" guarantee.
174        let sleep = tokio::time::sleep(self.config.max_wait);
175        tokio::pin!(sleep);
176
177        loop {
178            tokio::select! {
179                biased;
180
181                // Time threshold -- flush whatever we have
182                () = &mut sleep => {
183                    if self.buffer.is_empty() {
184                        // No items at all -- re-arm a fresh window and keep
185                        // waiting (don't return an empty batch).
186                        sleep
187                            .as_mut()
188                            .reset(tokio::time::Instant::now() + self.config.max_wait);
189                        continue;
190                    }
191                    return self.take_buffer();
192                }
193
194                // New item arrived
195                item = self.rx.recv() => {
196                    match item {
197                        Some((val, size)) => {
198                            self.buffer_bytes += size;
199                            self.buffer.push(val);
200                            if self.threshold_met() {
201                                return self.take_buffer();
202                            }
203                        }
204                        None => {
205                            // Channel closed -- drain remaining
206                            return self.take_buffer();
207                        }
208                    }
209                }
210            }
211        }
212    }
213
214    /// Drain any remaining buffered items (for graceful shutdown).
215    pub fn drain_remaining(&mut self) -> Vec<T> {
216        // Drain channel
217        while let Ok((val, size)) = self.rx.try_recv() {
218            self.buffer_bytes += size;
219            self.buffer.push(val);
220        }
221        self.take_buffer()
222    }
223
224    fn threshold_met(&self) -> bool {
225        self.buffer.len() >= self.config.max_items || self.buffer_bytes >= self.config.max_bytes
226    }
227
228    fn take_buffer(&mut self) -> Vec<T> {
229        self.buffer_bytes = 0;
230        std::mem::take(&mut self.buffer)
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237
238    #[tokio::test]
239    async fn test_drain_on_item_count() {
240        let config = AccumulatorConfig {
241            channel_capacity: 100,
242            max_items: 5,
243            max_bytes: usize::MAX,
244            max_wait: Duration::from_mins(1), // won't trigger
245        };
246        let (acc, mut drainer) = BatchAccumulator::new(config);
247
248        // Push 5 items -- should trigger drain
249        for i in 0..5 {
250            acc.push(i, 1).await.unwrap();
251        }
252
253        let batch = drainer.next_batch().await;
254        assert_eq!(batch.len(), 5);
255        assert_eq!(batch, vec![0, 1, 2, 3, 4]);
256    }
257
258    #[tokio::test]
259    async fn test_drain_on_byte_threshold() {
260        let config = AccumulatorConfig {
261            channel_capacity: 100,
262            max_items: 1000, // won't trigger
263            max_bytes: 10,   // trigger at 10 bytes
264            max_wait: Duration::from_mins(1),
265        };
266        let (acc, mut drainer) = BatchAccumulator::new(config);
267
268        // Push items with size=3 each -- 4 items = 12 bytes > 10 threshold
269        for i in 0..4 {
270            acc.push(i, 3).await.unwrap();
271        }
272
273        let batch = drainer.next_batch().await;
274        assert_eq!(batch.len(), 4);
275    }
276
277    #[tokio::test]
278    async fn test_drain_on_time_threshold() {
279        let config = AccumulatorConfig {
280            channel_capacity: 100,
281            max_items: 1000,
282            max_bytes: usize::MAX,
283            max_wait: Duration::from_millis(50), // 50ms
284        };
285        let (acc, mut drainer) = BatchAccumulator::new(config);
286
287        // Push 2 items (below count/byte threshold)
288        acc.push(1, 1).await.unwrap();
289        acc.push(2, 1).await.unwrap();
290
291        // Drain should fire after 50ms timeout
292        let batch = drainer.next_batch().await;
293        assert_eq!(batch.len(), 2);
294    }
295
296    #[tokio::test]
297    async fn test_backpressure_when_full() {
298        let config = AccumulatorConfig {
299            channel_capacity: 3,
300            max_items: 100,
301            max_bytes: usize::MAX,
302            max_wait: Duration::from_mins(1),
303        };
304        let (acc, _drainer) = BatchAccumulator::<i32>::new(config);
305
306        // Fill to capacity
307        acc.push(1, 1).await.unwrap();
308        acc.push(2, 1).await.unwrap();
309        acc.push(3, 1).await.unwrap();
310
311        // Next push should fail (backpressure)
312        let result = acc.push(4, 1).await;
313        assert!(result.is_err());
314    }
315
316    #[tokio::test]
317    async fn test_backpressure_error_reports_configured_capacity() {
318        let config = AccumulatorConfig {
319            channel_capacity: 3,
320            max_items: 100,
321            max_bytes: usize::MAX,
322            max_wait: Duration::from_mins(1),
323        };
324        let (acc, _drainer) = BatchAccumulator::<i32>::new(config);
325
326        acc.push(1, 1).await.unwrap();
327        acc.push(2, 1).await.unwrap();
328        acc.push(3, 1).await.unwrap();
329
330        let err = acc.push(4, 1).await.expect_err("channel full -> error");
331        // Reports the CONFIGURED capacity (3), not the remaining permits (0).
332        assert_eq!(err.capacity, 3);
333    }
334
335    /// Items arriving steadily faster than `max_wait` must NOT defer the flush:
336    /// the accumulation-window deadline is fixed per batch, so it fires even
337    /// while items keep coming. Regression for the prior per-iteration
338    /// `sleep(max_wait)` that was recreated on every arrival, resetting the
339    /// timer indefinitely under trickle traffic. `start_paused` makes the clock
340    /// deterministic.
341    #[tokio::test(start_paused = true)]
342    async fn test_trickle_traffic_flushes_on_fixed_deadline() {
343        let config = AccumulatorConfig {
344            channel_capacity: 100,
345            max_items: 1000,       // never trips on count
346            max_bytes: usize::MAX, // never trips on bytes
347            max_wait: Duration::from_millis(100),
348        };
349        let (acc, mut drainer) = BatchAccumulator::<i32>::new(config);
350
351        // One item every 40ms -- well under the 100ms window.
352        tokio::spawn(async move {
353            for i in 0..6 {
354                acc.push(i, 1).await.unwrap();
355                tokio::time::sleep(Duration::from_millis(40)).await;
356            }
357        });
358
359        let batch = drainer.next_batch().await;
360        // The fixed 100ms deadline flushes the items buffered within the window
361        // (those landing at 0/40/80ms), NOT all 6. Old reset-per-arrival
362        // behaviour would only flush after the producer stopped.
363        assert!(
364            !batch.is_empty(),
365            "should flush items buffered within the window"
366        );
367        assert!(
368            batch.len() < 6,
369            "expected a partial flush at the fixed deadline, got all {} items \
370             (timer reset on each arrival?)",
371            batch.len()
372        );
373    }
374
375    #[tokio::test]
376    async fn test_shutdown_drains_remaining() {
377        let config = AccumulatorConfig {
378            channel_capacity: 100,
379            max_items: 1000,
380            max_bytes: usize::MAX,
381            max_wait: Duration::from_mins(1),
382        };
383        let (acc, mut drainer) = BatchAccumulator::new(config);
384
385        acc.push(10, 1).await.unwrap();
386        acc.push(20, 1).await.unwrap();
387
388        // Drop the push handle (simulates shutdown)
389        drop(acc);
390
391        // next_batch should return remaining items
392        let batch = drainer.next_batch().await;
393        assert_eq!(batch, vec![10, 20]);
394
395        // Subsequent call returns empty (channel closed)
396        let batch = drainer.next_batch().await;
397        assert!(batch.is_empty());
398    }
399
400    #[tokio::test]
401    async fn test_multiple_batches() {
402        let config = AccumulatorConfig {
403            channel_capacity: 100,
404            max_items: 3,
405            max_bytes: usize::MAX,
406            max_wait: Duration::from_mins(1),
407        };
408        let (acc, mut drainer) = BatchAccumulator::new(config);
409
410        // Push 7 items -- should produce 2 full batches + 1 partial
411        for i in 0..7 {
412            acc.push(i, 1).await.unwrap();
413        }
414        drop(acc); // signal shutdown to drain the last partial
415
416        let b1 = drainer.next_batch().await;
417        assert_eq!(b1.len(), 3);
418
419        let b2 = drainer.next_batch().await;
420        assert_eq!(b2.len(), 3);
421
422        let b3 = drainer.next_batch().await;
423        assert_eq!(b3.len(), 1); // remaining partial batch
424
425        let b4 = drainer.next_batch().await;
426        assert!(b4.is_empty()); // channel closed
427    }
428
429    #[tokio::test]
430    async fn test_push_handle_is_clone() {
431        let config = AccumulatorConfig::default();
432        let (acc, mut drainer) = BatchAccumulator::new(config);
433
434        let acc2 = acc.clone();
435
436        acc.push(1, 1).await.unwrap();
437        acc2.push(2, 1).await.unwrap();
438
439        drop(acc);
440        drop(acc2);
441
442        let batch = drainer.next_batch().await;
443        assert_eq!(batch.len(), 2);
444    }
445
446    #[tokio::test]
447    async fn test_drain_remaining_on_shutdown() {
448        let config = AccumulatorConfig {
449            channel_capacity: 100,
450            max_items: 1000,
451            max_bytes: usize::MAX,
452            max_wait: Duration::from_mins(1),
453        };
454        let (acc, mut drainer) = BatchAccumulator::new(config);
455
456        acc.push(1, 1).await.unwrap();
457        acc.push(2, 1).await.unwrap();
458        acc.push(3, 1).await.unwrap();
459        drop(acc);
460
461        let remaining = drainer.drain_remaining();
462        assert_eq!(remaining, vec![1, 2, 3]);
463    }
464
465    #[tokio::test]
466    async fn test_empty_drain_returns_empty() {
467        let config = AccumulatorConfig::default();
468        let (_acc, mut drainer) = BatchAccumulator::<i32>::new(config);
469
470        let remaining = drainer.drain_remaining();
471        assert!(remaining.is_empty());
472    }
473
474    /// Push-ingest helper: drained Records + supplied tokens become a WorkBatch.
475    #[cfg(feature = "transport")]
476    #[tokio::test]
477    async fn test_records_drain_into_work_batch() {
478        use crate::transport::{CommitToken, PayloadFormat, Record, RecordMeta};
479        use bytes::Bytes;
480
481        #[derive(Debug, Clone)]
482        struct PushTok(u64);
483        impl std::fmt::Display for PushTok {
484            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
485                write!(f, "push-{}", self.0)
486            }
487        }
488        impl CommitToken for PushTok {}
489
490        let record = |payload: &'static [u8]| Record {
491            payload: Bytes::from_static(payload),
492            key: None,
493            headers: vec![],
494            metadata: RecordMeta {
495                timestamp_ms: None,
496                format: PayloadFormat::Json,
497            },
498        };
499
500        let config = AccumulatorConfig {
501            channel_capacity: 100,
502            max_items: 3,
503            max_bytes: usize::MAX,
504            max_wait: Duration::from_mins(1),
505        };
506        let (acc, mut drainer) = BatchAccumulator::<Record>::new(config);
507        acc.push(record(b"{\"a\":1}"), 7).await.unwrap();
508        acc.push(record(b"{\"b\":2}"), 7).await.unwrap();
509        acc.push(record(b"{\"c\":3}"), 7).await.unwrap();
510
511        let block = drainer.next_batch().await;
512        assert_eq!(block.len(), 3);
513
514        // Two source acks for a three-record block (push sources ack per request,
515        // not per record) -- the helper must NOT tie token count to record count.
516        let tokens = vec![PushTok(1), PushTok(2)];
517        let wb = records_into_work_batch(block, tokens);
518        assert_eq!(wb.record_count(), 3);
519        assert_eq!(wb.commit_tokens.len(), 2);
520        assert!(wb.dlq_entries.is_empty());
521    }
522}