Skip to main content

hyperi_rustlib/worker/
accumulator.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/accumulator.rs
3// Purpose:   Bounded batch accumulator with time/count/bytes drain thresholds
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Bounded batch accumulator for DFE pipeline batching.
10//!
11//! Accumulates items from multiple producers (HTTP handlers, gRPC handlers, etc.)
12//! and drains them as batches when any threshold is met:
13//! - Item count reaches `max_items`
14//! - Byte count reaches `max_bytes`
15//! - Time since last drain reaches `max_wait`
16//!
17//! Bounded -- pushers get an error when the channel is full (backpressure).
18//! Shutdown-safe -- `drain_remaining()` flushes buffered items.
19//!
20//! ## Example
21//!
22//! ```rust,ignore
23//! use hyperi_rustlib::worker::BatchAccumulator;
24//! use std::time::Duration;
25//!
26//! let (acc, mut drainer) = BatchAccumulator::new(
27//!     1000,                        // channel capacity (backpressure bound)
28//!     100,                         // max items per batch
29//!     1024 * 1024,                 // max bytes per batch (1MB)
30//!     Duration::from_millis(10),   // max wait before flush
31//! );
32//!
33//! // Producers push (from HTTP handlers, etc.)
34//! acc.push(payload, payload.len()).await?;
35//!
36//! // Consumer drains batches (background task)
37//! loop {
38//!     let batch = drainer.next_batch().await;
39//!     if batch.is_empty() { break; } // shutdown
40//!     process_batch(&batch);
41//! }
42//! ```
43
44use std::time::Duration;
45
46use tokio::sync::mpsc;
47
48/// Accumulator configuration.
49#[derive(Debug, Clone)]
50pub struct AccumulatorConfig {
51    /// Channel capacity (bounded -- pushers get error when full).
52    pub channel_capacity: usize,
53    /// Maximum items per batch before auto-drain.
54    pub max_items: usize,
55    /// Maximum accumulated bytes per batch before auto-drain.
56    pub max_bytes: usize,
57    /// Maximum time since last drain before auto-flush.
58    pub max_wait: Duration,
59}
60
61impl Default for AccumulatorConfig {
62    fn default() -> Self {
63        Self {
64            channel_capacity: 10_000,
65            max_items: 100,
66            max_bytes: 1024 * 1024, // 1MB
67            max_wait: Duration::from_millis(10),
68        }
69    }
70}
71
72/// Push handle -- cloneable, used by producers to send items into the accumulator.
73#[derive(Clone)]
74pub struct BatchAccumulator<T> {
75    tx: mpsc::Sender<(T, usize)>, // (item, byte_size)
76}
77
78/// Drain handle -- used by a single consumer to receive batches.
79pub struct BatchDrainer<T> {
80    rx: mpsc::Receiver<(T, usize)>,
81    config: AccumulatorConfig,
82    buffer: Vec<T>,
83    buffer_bytes: usize,
84}
85
86/// Error when the accumulator channel is full (backpressure).
87#[derive(Debug, thiserror::Error)]
88#[error("accumulator full -- backpressure active ({capacity} items buffered)")]
89pub struct AccumulatorFull {
90    pub capacity: usize,
91}
92
93impl<T: Send + 'static> BatchAccumulator<T> {
94    /// Create a new accumulator + drainer pair.
95    ///
96    /// Returns `(push_handle, drain_handle)`. The push handle is `Clone` for
97    /// sharing across HTTP/gRPC handlers. The drain handle is used by a single
98    /// background task to receive batches.
99    #[must_use]
100    pub fn new(config: AccumulatorConfig) -> (Self, BatchDrainer<T>) {
101        let (tx, rx) = mpsc::channel(config.channel_capacity);
102        let drainer = BatchDrainer {
103            rx,
104            buffer: Vec::with_capacity(config.max_items),
105            buffer_bytes: 0,
106            config: config.clone(),
107        };
108        (Self { tx }, drainer)
109    }
110
111    /// Push an item into the accumulator.
112    ///
113    /// `byte_size` is used for the bytes threshold. Pass `payload.len()`.
114    ///
115    /// # Errors
116    ///
117    /// Returns `AccumulatorFull` if the channel is at capacity (backpressure).
118    pub async fn push(&self, item: T, byte_size: usize) -> Result<(), AccumulatorFull> {
119        self.tx
120            .try_send((item, byte_size))
121            .map_err(|_| AccumulatorFull {
122                capacity: self.tx.capacity(),
123            })
124    }
125
126    /// Check if the accumulator has been closed (drainer dropped).
127    #[must_use]
128    pub fn is_closed(&self) -> bool {
129        self.tx.is_closed()
130    }
131}
132
133/// Drain accumulated [`Record`]s into a [`WorkBatch`] for push-ingest sources.
134///
135/// Push-ingest transports (HTTP, gRPC) accumulate [`Record`]s via a
136/// `BatchAccumulator<Record>` and, on drain, must turn the drained block into
137/// the canonical [`WorkBatch`] currency to feed the engine driver. This helper
138/// is that bridge: it pairs the drained records with the caller-supplied source
139/// commit tokens (the per-request responders / acks) into one block.
140///
141/// Additive -- the generic [`BatchAccumulator`] / [`BatchDrainer`] core is
142/// untouched; this is a free function over the `Record` element type. The
143/// `commit_tokens` are supplied by the caller because the push source owns the
144/// ack (an HTTP responder, a gRPC stream slot) -- the accumulator only carries
145/// the payload records.
146///
147/// [`Record`]: crate::transport::Record
148/// [`WorkBatch`]: crate::transport::WorkBatch
149#[cfg(feature = "transport")]
150#[must_use]
151pub fn records_into_work_batch<T: crate::transport::CommitToken>(
152    records: Vec<crate::transport::Record>,
153    commit_tokens: Vec<T>,
154) -> crate::transport::WorkBatch<T> {
155    crate::transport::WorkBatch::new(records, commit_tokens)
156}
157
158impl<T> BatchDrainer<T> {
159    /// Wait for the next batch.
160    ///
161    /// Blocks until any threshold is met (items, bytes, or time). Returns
162    /// an empty vec when the channel is closed (all producers dropped = shutdown).
163    pub async fn next_batch(&mut self) -> Vec<T> {
164        // If buffer already meets a threshold, drain immediately
165        if self.threshold_met() {
166            return self.take_buffer();
167        }
168
169        // Wait for items with a timeout
170        loop {
171            let timeout = tokio::time::sleep(self.config.max_wait);
172
173            tokio::select! {
174                biased;
175
176                // Time threshold -- flush whatever we have
177                () = timeout => {
178                    if self.buffer.is_empty() {
179                        // No items at all -- keep waiting (don't return empty batch)
180                        continue;
181                    }
182                    return self.take_buffer();
183                }
184
185                // New item arrived
186                item = self.rx.recv() => {
187                    match item {
188                        Some((val, size)) => {
189                            self.buffer_bytes += size;
190                            self.buffer.push(val);
191                            if self.threshold_met() {
192                                return self.take_buffer();
193                            }
194                        }
195                        None => {
196                            // Channel closed -- drain remaining
197                            return self.take_buffer();
198                        }
199                    }
200                }
201            }
202        }
203    }
204
205    /// Drain any remaining buffered items (for graceful shutdown).
206    pub fn drain_remaining(&mut self) -> Vec<T> {
207        // Drain channel
208        while let Ok((val, size)) = self.rx.try_recv() {
209            self.buffer_bytes += size;
210            self.buffer.push(val);
211        }
212        self.take_buffer()
213    }
214
215    fn threshold_met(&self) -> bool {
216        self.buffer.len() >= self.config.max_items || self.buffer_bytes >= self.config.max_bytes
217    }
218
219    fn take_buffer(&mut self) -> Vec<T> {
220        self.buffer_bytes = 0;
221        std::mem::take(&mut self.buffer)
222    }
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228
229    #[tokio::test]
230    async fn test_drain_on_item_count() {
231        let config = AccumulatorConfig {
232            channel_capacity: 100,
233            max_items: 5,
234            max_bytes: usize::MAX,
235            max_wait: Duration::from_mins(1), // won't trigger
236        };
237        let (acc, mut drainer) = BatchAccumulator::new(config);
238
239        // Push 5 items -- should trigger drain
240        for i in 0..5 {
241            acc.push(i, 1).await.unwrap();
242        }
243
244        let batch = drainer.next_batch().await;
245        assert_eq!(batch.len(), 5);
246        assert_eq!(batch, vec![0, 1, 2, 3, 4]);
247    }
248
249    #[tokio::test]
250    async fn test_drain_on_byte_threshold() {
251        let config = AccumulatorConfig {
252            channel_capacity: 100,
253            max_items: 1000, // won't trigger
254            max_bytes: 10,   // trigger at 10 bytes
255            max_wait: Duration::from_mins(1),
256        };
257        let (acc, mut drainer) = BatchAccumulator::new(config);
258
259        // Push items with size=3 each -- 4 items = 12 bytes > 10 threshold
260        for i in 0..4 {
261            acc.push(i, 3).await.unwrap();
262        }
263
264        let batch = drainer.next_batch().await;
265        assert_eq!(batch.len(), 4);
266    }
267
268    #[tokio::test]
269    async fn test_drain_on_time_threshold() {
270        let config = AccumulatorConfig {
271            channel_capacity: 100,
272            max_items: 1000,
273            max_bytes: usize::MAX,
274            max_wait: Duration::from_millis(50), // 50ms
275        };
276        let (acc, mut drainer) = BatchAccumulator::new(config);
277
278        // Push 2 items (below count/byte threshold)
279        acc.push(1, 1).await.unwrap();
280        acc.push(2, 1).await.unwrap();
281
282        // Drain should fire after 50ms timeout
283        let batch = drainer.next_batch().await;
284        assert_eq!(batch.len(), 2);
285    }
286
287    #[tokio::test]
288    async fn test_backpressure_when_full() {
289        let config = AccumulatorConfig {
290            channel_capacity: 3,
291            max_items: 100,
292            max_bytes: usize::MAX,
293            max_wait: Duration::from_mins(1),
294        };
295        let (acc, _drainer) = BatchAccumulator::<i32>::new(config);
296
297        // Fill to capacity
298        acc.push(1, 1).await.unwrap();
299        acc.push(2, 1).await.unwrap();
300        acc.push(3, 1).await.unwrap();
301
302        // Next push should fail (backpressure)
303        let result = acc.push(4, 1).await;
304        assert!(result.is_err());
305    }
306
307    #[tokio::test]
308    async fn test_shutdown_drains_remaining() {
309        let config = AccumulatorConfig {
310            channel_capacity: 100,
311            max_items: 1000,
312            max_bytes: usize::MAX,
313            max_wait: Duration::from_mins(1),
314        };
315        let (acc, mut drainer) = BatchAccumulator::new(config);
316
317        acc.push(10, 1).await.unwrap();
318        acc.push(20, 1).await.unwrap();
319
320        // Drop the push handle (simulates shutdown)
321        drop(acc);
322
323        // next_batch should return remaining items
324        let batch = drainer.next_batch().await;
325        assert_eq!(batch, vec![10, 20]);
326
327        // Subsequent call returns empty (channel closed)
328        let batch = drainer.next_batch().await;
329        assert!(batch.is_empty());
330    }
331
332    #[tokio::test]
333    async fn test_multiple_batches() {
334        let config = AccumulatorConfig {
335            channel_capacity: 100,
336            max_items: 3,
337            max_bytes: usize::MAX,
338            max_wait: Duration::from_mins(1),
339        };
340        let (acc, mut drainer) = BatchAccumulator::new(config);
341
342        // Push 7 items -- should produce 2 full batches + 1 partial
343        for i in 0..7 {
344            acc.push(i, 1).await.unwrap();
345        }
346        drop(acc); // signal shutdown to drain the last partial
347
348        let b1 = drainer.next_batch().await;
349        assert_eq!(b1.len(), 3);
350
351        let b2 = drainer.next_batch().await;
352        assert_eq!(b2.len(), 3);
353
354        let b3 = drainer.next_batch().await;
355        assert_eq!(b3.len(), 1); // remaining partial batch
356
357        let b4 = drainer.next_batch().await;
358        assert!(b4.is_empty()); // channel closed
359    }
360
361    #[tokio::test]
362    async fn test_push_handle_is_clone() {
363        let config = AccumulatorConfig::default();
364        let (acc, mut drainer) = BatchAccumulator::new(config);
365
366        let acc2 = acc.clone();
367
368        acc.push(1, 1).await.unwrap();
369        acc2.push(2, 1).await.unwrap();
370
371        drop(acc);
372        drop(acc2);
373
374        let batch = drainer.next_batch().await;
375        assert_eq!(batch.len(), 2);
376    }
377
378    #[tokio::test]
379    async fn test_drain_remaining_on_shutdown() {
380        let config = AccumulatorConfig {
381            channel_capacity: 100,
382            max_items: 1000,
383            max_bytes: usize::MAX,
384            max_wait: Duration::from_mins(1),
385        };
386        let (acc, mut drainer) = BatchAccumulator::new(config);
387
388        acc.push(1, 1).await.unwrap();
389        acc.push(2, 1).await.unwrap();
390        acc.push(3, 1).await.unwrap();
391        drop(acc);
392
393        let remaining = drainer.drain_remaining();
394        assert_eq!(remaining, vec![1, 2, 3]);
395    }
396
397    #[tokio::test]
398    async fn test_empty_drain_returns_empty() {
399        let config = AccumulatorConfig::default();
400        let (_acc, mut drainer) = BatchAccumulator::<i32>::new(config);
401
402        let remaining = drainer.drain_remaining();
403        assert!(remaining.is_empty());
404    }
405
406    /// Push-ingest helper: drained Records + supplied tokens become a WorkBatch.
407    #[cfg(feature = "transport")]
408    #[tokio::test]
409    async fn test_records_drain_into_work_batch() {
410        use crate::transport::{CommitToken, PayloadFormat, Record, RecordMeta};
411        use bytes::Bytes;
412
413        #[derive(Debug, Clone)]
414        struct PushTok(u64);
415        impl std::fmt::Display for PushTok {
416            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
417                write!(f, "push-{}", self.0)
418            }
419        }
420        impl CommitToken for PushTok {}
421
422        let record = |payload: &'static [u8]| Record {
423            payload: Bytes::from_static(payload),
424            key: None,
425            headers: vec![],
426            metadata: RecordMeta {
427                timestamp_ms: None,
428                format: PayloadFormat::Json,
429            },
430        };
431
432        let config = AccumulatorConfig {
433            channel_capacity: 100,
434            max_items: 3,
435            max_bytes: usize::MAX,
436            max_wait: Duration::from_mins(1),
437        };
438        let (acc, mut drainer) = BatchAccumulator::<Record>::new(config);
439        acc.push(record(b"{\"a\":1}"), 7).await.unwrap();
440        acc.push(record(b"{\"b\":2}"), 7).await.unwrap();
441        acc.push(record(b"{\"c\":3}"), 7).await.unwrap();
442
443        let block = drainer.next_batch().await;
444        assert_eq!(block.len(), 3);
445
446        // Two source acks for a three-record block (push sources ack per request,
447        // not per record) -- the helper must NOT tie token count to record count.
448        let tokens = vec![PushTok(1), PushTok(2)];
449        let wb = records_into_work_batch(block, tokens);
450        assert_eq!(wb.record_count(), 3);
451        assert_eq!(wb.commit_tokens.len(), 2);
452        assert!(wb.dlq_entries.is_empty());
453    }
454}