hyperi-rustlib 2.8.6

There's plenty of sage advice out there about how to run Rust services in production at scale — config cascades, structured logging, masking secrets, multi-backend secrets management, Prometheus, OpenTelemetry, Kafka transports, tiered disk-spillover sinks, adaptive worker pools, graceful shutdown — but almost none of it as code you can just install and use. This is that code. Opinionated, drop-in, working out of the box. The patterns from blog posts, watercooler chats and beers with your Google mates as actual library — not a framework you assemble from twenty crates and 8 weeks of munging.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
// Project:   hyperi-rustlib
// File:      src/worker/accumulator.rs
// Purpose:   Bounded batch accumulator with time/count/bytes drain thresholds
// Language:  Rust
//
// License:   BUSL-1.1
// Copyright: (c) 2026 HYPERI PTY LIMITED

//! Bounded batch accumulator for DFE pipeline batching.
//!
//! Accumulates items from multiple producers (HTTP handlers, gRPC handlers, etc.)
//! and drains them as batches when any threshold is met:
//! - Item count reaches `max_items`
//! - Byte count reaches `max_bytes`
//! - Time since last drain reaches `max_wait`
//!
//! Bounded -- pushers get an error when the channel is full (backpressure).
//! Shutdown-safe -- `drain_remaining()` flushes buffered items.
//!
//! ## Example
//!
//! ```rust,ignore
//! use hyperi_rustlib::worker::BatchAccumulator;
//! use std::time::Duration;
//!
//! let (acc, mut drainer) = BatchAccumulator::new(
//!     1000,                        // channel capacity (backpressure bound)
//!     100,                         // max items per batch
//!     1024 * 1024,                 // max bytes per batch (1MB)
//!     Duration::from_millis(10),   // max wait before flush
//! );
//!
//! // Producers push (from HTTP handlers, etc.)
//! acc.push(payload, payload.len()).await?;
//!
//! // Consumer drains batches (background task)
//! loop {
//!     let batch = drainer.next_batch().await;
//!     if batch.is_empty() { break; } // shutdown
//!     process_batch(&batch);
//! }
//! ```

use std::time::Duration;

use tokio::sync::mpsc;

/// Accumulator configuration.
#[derive(Debug, Clone)]
pub struct AccumulatorConfig {
    /// Channel capacity (bounded -- pushers get error when full).
    pub channel_capacity: usize,
    /// Maximum items per batch before auto-drain.
    pub max_items: usize,
    /// Maximum accumulated bytes per batch before auto-drain.
    pub max_bytes: usize,
    /// Maximum time since last drain before auto-flush.
    pub max_wait: Duration,
}

impl Default for AccumulatorConfig {
    fn default() -> Self {
        Self {
            channel_capacity: 10_000,
            max_items: 100,
            max_bytes: 1024 * 1024, // 1MB
            max_wait: Duration::from_millis(10),
        }
    }
}

/// Push handle -- cloneable, used by producers to send items into the accumulator.
#[derive(Clone)]
pub struct BatchAccumulator<T> {
    tx: mpsc::Sender<(T, usize)>, // (item, byte_size)
}

/// Drain handle -- used by a single consumer to receive batches.
pub struct BatchDrainer<T> {
    rx: mpsc::Receiver<(T, usize)>,
    config: AccumulatorConfig,
    buffer: Vec<T>,
    buffer_bytes: usize,
}

/// Error when the accumulator channel is full (backpressure).
#[derive(Debug, thiserror::Error)]
#[error("accumulator full -- backpressure active ({capacity} items buffered)")]
pub struct AccumulatorFull {
    pub capacity: usize,
}

impl<T: Send + 'static> BatchAccumulator<T> {
    /// Create a new accumulator + drainer pair.
    ///
    /// Returns `(push_handle, drain_handle)`. The push handle is `Clone` for
    /// sharing across HTTP/gRPC handlers. The drain handle is used by a single
    /// background task to receive batches.
    #[must_use]
    pub fn new(config: AccumulatorConfig) -> (Self, BatchDrainer<T>) {
        let (tx, rx) = mpsc::channel(config.channel_capacity);
        let drainer = BatchDrainer {
            rx,
            buffer: Vec::with_capacity(config.max_items),
            buffer_bytes: 0,
            config: config.clone(),
        };
        (Self { tx }, drainer)
    }

    /// Push an item into the accumulator.
    ///
    /// `byte_size` is used for the bytes threshold. Pass `payload.len()`.
    ///
    /// # Errors
    ///
    /// Returns `AccumulatorFull` if the channel is at capacity (backpressure).
    pub async fn push(&self, item: T, byte_size: usize) -> Result<(), AccumulatorFull> {
        self.tx
            .try_send((item, byte_size))
            .map_err(|_| AccumulatorFull {
                capacity: self.tx.capacity(),
            })
    }

    /// Check if the accumulator has been closed (drainer dropped).
    #[must_use]
    pub fn is_closed(&self) -> bool {
        self.tx.is_closed()
    }
}

/// Drain accumulated [`Record`]s into a [`WorkBatch`] for push-ingest sources.
///
/// Push-ingest transports (HTTP, gRPC) accumulate [`Record`]s via a
/// `BatchAccumulator<Record>` and, on drain, must turn the drained block into
/// the canonical [`WorkBatch`] currency to feed the engine driver. This helper
/// is that bridge: it pairs the drained records with the caller-supplied source
/// commit tokens (the per-request responders / acks) into one block.
///
/// Additive -- the generic [`BatchAccumulator`] / [`BatchDrainer`] core is
/// untouched; this is a free function over the `Record` element type. The
/// `commit_tokens` are supplied by the caller because the push source owns the
/// ack (an HTTP responder, a gRPC stream slot) -- the accumulator only carries
/// the payload records.
///
/// [`Record`]: crate::transport::Record
/// [`WorkBatch`]: crate::transport::WorkBatch
#[cfg(feature = "transport")]
#[must_use]
pub fn records_into_work_batch<T: crate::transport::CommitToken>(
    records: Vec<crate::transport::Record>,
    commit_tokens: Vec<T>,
) -> crate::transport::WorkBatch<T> {
    crate::transport::WorkBatch::new(records, commit_tokens)
}

impl<T> BatchDrainer<T> {
    /// Wait for the next batch.
    ///
    /// Blocks until any threshold is met (items, bytes, or time). Returns
    /// an empty vec when the channel is closed (all producers dropped = shutdown).
    pub async fn next_batch(&mut self) -> Vec<T> {
        // If buffer already meets a threshold, drain immediately
        if self.threshold_met() {
            return self.take_buffer();
        }

        // Wait for items with a timeout
        loop {
            let timeout = tokio::time::sleep(self.config.max_wait);

            tokio::select! {
                biased;

                // Time threshold -- flush whatever we have
                () = timeout => {
                    if self.buffer.is_empty() {
                        // No items at all -- keep waiting (don't return empty batch)
                        continue;
                    }
                    return self.take_buffer();
                }

                // New item arrived
                item = self.rx.recv() => {
                    match item {
                        Some((val, size)) => {
                            self.buffer_bytes += size;
                            self.buffer.push(val);
                            if self.threshold_met() {
                                return self.take_buffer();
                            }
                        }
                        None => {
                            // Channel closed -- drain remaining
                            return self.take_buffer();
                        }
                    }
                }
            }
        }
    }

    /// Drain any remaining buffered items (for graceful shutdown).
    pub fn drain_remaining(&mut self) -> Vec<T> {
        // Drain channel
        while let Ok((val, size)) = self.rx.try_recv() {
            self.buffer_bytes += size;
            self.buffer.push(val);
        }
        self.take_buffer()
    }

    fn threshold_met(&self) -> bool {
        self.buffer.len() >= self.config.max_items || self.buffer_bytes >= self.config.max_bytes
    }

    fn take_buffer(&mut self) -> Vec<T> {
        self.buffer_bytes = 0;
        std::mem::take(&mut self.buffer)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_drain_on_item_count() {
        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 5,
            max_bytes: usize::MAX,
            max_wait: Duration::from_mins(1), // won't trigger
        };
        let (acc, mut drainer) = BatchAccumulator::new(config);

        // Push 5 items -- should trigger drain
        for i in 0..5 {
            acc.push(i, 1).await.unwrap();
        }

        let batch = drainer.next_batch().await;
        assert_eq!(batch.len(), 5);
        assert_eq!(batch, vec![0, 1, 2, 3, 4]);
    }

    #[tokio::test]
    async fn test_drain_on_byte_threshold() {
        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 1000, // won't trigger
            max_bytes: 10,   // trigger at 10 bytes
            max_wait: Duration::from_mins(1),
        };
        let (acc, mut drainer) = BatchAccumulator::new(config);

        // Push items with size=3 each -- 4 items = 12 bytes > 10 threshold
        for i in 0..4 {
            acc.push(i, 3).await.unwrap();
        }

        let batch = drainer.next_batch().await;
        assert_eq!(batch.len(), 4);
    }

    #[tokio::test]
    async fn test_drain_on_time_threshold() {
        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 1000,
            max_bytes: usize::MAX,
            max_wait: Duration::from_millis(50), // 50ms
        };
        let (acc, mut drainer) = BatchAccumulator::new(config);

        // Push 2 items (below count/byte threshold)
        acc.push(1, 1).await.unwrap();
        acc.push(2, 1).await.unwrap();

        // Drain should fire after 50ms timeout
        let batch = drainer.next_batch().await;
        assert_eq!(batch.len(), 2);
    }

    #[tokio::test]
    async fn test_backpressure_when_full() {
        let config = AccumulatorConfig {
            channel_capacity: 3,
            max_items: 100,
            max_bytes: usize::MAX,
            max_wait: Duration::from_mins(1),
        };
        let (acc, _drainer) = BatchAccumulator::<i32>::new(config);

        // Fill to capacity
        acc.push(1, 1).await.unwrap();
        acc.push(2, 1).await.unwrap();
        acc.push(3, 1).await.unwrap();

        // Next push should fail (backpressure)
        let result = acc.push(4, 1).await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_shutdown_drains_remaining() {
        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 1000,
            max_bytes: usize::MAX,
            max_wait: Duration::from_mins(1),
        };
        let (acc, mut drainer) = BatchAccumulator::new(config);

        acc.push(10, 1).await.unwrap();
        acc.push(20, 1).await.unwrap();

        // Drop the push handle (simulates shutdown)
        drop(acc);

        // next_batch should return remaining items
        let batch = drainer.next_batch().await;
        assert_eq!(batch, vec![10, 20]);

        // Subsequent call returns empty (channel closed)
        let batch = drainer.next_batch().await;
        assert!(batch.is_empty());
    }

    #[tokio::test]
    async fn test_multiple_batches() {
        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 3,
            max_bytes: usize::MAX,
            max_wait: Duration::from_mins(1),
        };
        let (acc, mut drainer) = BatchAccumulator::new(config);

        // Push 7 items -- should produce 2 full batches + 1 partial
        for i in 0..7 {
            acc.push(i, 1).await.unwrap();
        }
        drop(acc); // signal shutdown to drain the last partial

        let b1 = drainer.next_batch().await;
        assert_eq!(b1.len(), 3);

        let b2 = drainer.next_batch().await;
        assert_eq!(b2.len(), 3);

        let b3 = drainer.next_batch().await;
        assert_eq!(b3.len(), 1); // remaining partial batch

        let b4 = drainer.next_batch().await;
        assert!(b4.is_empty()); // channel closed
    }

    #[tokio::test]
    async fn test_push_handle_is_clone() {
        let config = AccumulatorConfig::default();
        let (acc, mut drainer) = BatchAccumulator::new(config);

        let acc2 = acc.clone();

        acc.push(1, 1).await.unwrap();
        acc2.push(2, 1).await.unwrap();

        drop(acc);
        drop(acc2);

        let batch = drainer.next_batch().await;
        assert_eq!(batch.len(), 2);
    }

    #[tokio::test]
    async fn test_drain_remaining_on_shutdown() {
        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 1000,
            max_bytes: usize::MAX,
            max_wait: Duration::from_mins(1),
        };
        let (acc, mut drainer) = BatchAccumulator::new(config);

        acc.push(1, 1).await.unwrap();
        acc.push(2, 1).await.unwrap();
        acc.push(3, 1).await.unwrap();
        drop(acc);

        let remaining = drainer.drain_remaining();
        assert_eq!(remaining, vec![1, 2, 3]);
    }

    #[tokio::test]
    async fn test_empty_drain_returns_empty() {
        let config = AccumulatorConfig::default();
        let (_acc, mut drainer) = BatchAccumulator::<i32>::new(config);

        let remaining = drainer.drain_remaining();
        assert!(remaining.is_empty());
    }

    /// Push-ingest helper: drained Records + supplied tokens become a WorkBatch.
    #[cfg(feature = "transport")]
    #[tokio::test]
    async fn test_records_drain_into_work_batch() {
        use crate::transport::{CommitToken, PayloadFormat, Record, RecordMeta};
        use bytes::Bytes;

        #[derive(Debug, Clone)]
        struct PushTok(u64);
        impl std::fmt::Display for PushTok {
            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
                write!(f, "push-{}", self.0)
            }
        }
        impl CommitToken for PushTok {}

        let record = |payload: &'static [u8]| Record {
            payload: Bytes::from_static(payload),
            key: None,
            headers: vec![],
            metadata: RecordMeta {
                timestamp_ms: None,
                format: PayloadFormat::Json,
            },
        };

        let config = AccumulatorConfig {
            channel_capacity: 100,
            max_items: 3,
            max_bytes: usize::MAX,
            max_wait: Duration::from_mins(1),
        };
        let (acc, mut drainer) = BatchAccumulator::<Record>::new(config);
        acc.push(record(b"{\"a\":1}"), 7).await.unwrap();
        acc.push(record(b"{\"b\":2}"), 7).await.unwrap();
        acc.push(record(b"{\"c\":3}"), 7).await.unwrap();

        let block = drainer.next_batch().await;
        assert_eq!(block.len(), 3);

        // Two source acks for a three-record block (push sources ack per request,
        // not per record) -- the helper must NOT tie token count to record count.
        let tokens = vec![PushTok(1), PushTok(2)];
        let wb = records_into_work_batch(block, tokens);
        assert_eq!(wb.record_count(), 3);
        assert_eq!(wb.commit_tokens.len(), 2);
        assert!(wb.dlq_entries.is_empty());
    }
}