hyperi_rustlib/worker/batch.rs
1// Project: hyperi-rustlib
2// File: src/worker/batch.rs
3// Purpose: BatchProcessor trait and BatchPipeline for parallel-then-sequential processing
4// Language: Rust
5//
6// License: BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Batch processing framework for DFE pipeline parallelisation.
10//!
11//! Provides the [`BatchProcessor`] trait for defining parallel-safe message
12//! processing, and [`BatchPipeline`] for orchestrating the parallel (rayon) →
13//! sequential (state mutation) pipeline.
14//!
15//! ## The Pattern
16//!
17//! Every DFE app follows the same structure:
18//!
19//! 1. **Parallel phase:** Process each message through a pure `&self` function
20//! (parse, route, transform, enrich) -- via rayon `process_batch()`
21//! 2. **Sequential phase:** Apply results to mutable state (buffer push,
22//! mark_pending, stats update, DLQ routing)
23//!
24//! The [`BatchProcessor`] trait captures phase 1. Phase 2 is app-specific
25//! (each app has different buffers, caches, and sinks).
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use hyperi_rustlib::worker::{BatchPipeline, BatchProcessor};
31//!
32//! struct MyProcessor<'a> { router: &'a Router, ... }
33//!
34//! impl BatchProcessor for MyProcessor<'_> {
35//! type Input = KafkaMessage;
36//! type Output = ProcessedMessage;
37//! type Error = MyError;
38//!
39//! fn process(&self, msg: &KafkaMessage) -> Result<ProcessedMessage, MyError> {
40//! let parsed = sonic_rs::from_slice(&msg.payload)?;
41//! let table = self.router.route(&parsed)?;
42//! Ok(ProcessedMessage { table, data: parsed })
43//! }
44//! }
45//!
46//! // In event loop:
47//! let processor = MyProcessor { router: &router, ... };
48//! let results = pipeline.process_batch(&processor, &batch);
49//! drop(processor); // release immutable borrows
50//! // Sequential phase: apply results to mutable state
51//! ```
52
53use std::sync::Arc;
54
55use super::pool::AdaptiveWorkerPool;
56use super::stats::PipelineStats;
57
58/// Trait for parallel-safe message processing.
59///
60/// Implement this with a struct that holds only `&` references to immutable
61/// dependencies. The `process` method must be pure -- no mutable state, no I/O,
62/// no `.await`. Safe for rayon `par_iter()`.
63///
64/// The struct is typically created per-batch in the event loop (borrows released
65/// before the sequential phase begins). The borrow checker enforces this.
66pub trait BatchProcessor: Sync {
67 /// Input message type (e.g. `KafkaMessage`, `HttpRequest`).
68 type Input: Sync;
69
70 /// Successful processing result (e.g. `ProcessedMessage`, `CompressedBatch`).
71 type Output: Send;
72
73 /// Error type for processing failures.
74 type Error: Send;
75
76 /// Process a single input. Must be pure -- no mutation, no I/O.
77 fn process(&self, input: &Self::Input) -> Result<Self::Output, Self::Error>;
78}
79
80/// Orchestrates parallel batch processing via [`AdaptiveWorkerPool`].
81///
82/// Wraps the worker pool with common DFE pipeline concerns: stats tracking,
83/// memory accounting, and metrics emission. Apps provide a [`BatchProcessor`]
84/// implementation; the pipeline handles the rest.
85pub struct BatchPipeline {
86 pool: Arc<AdaptiveWorkerPool>,
87 stats: Arc<PipelineStats>,
88}
89
90impl BatchPipeline {
91 /// Create a new batch pipeline.
92 #[must_use]
93 pub fn new(pool: Arc<AdaptiveWorkerPool>, stats: Arc<PipelineStats>) -> Self {
94 Self { pool, stats }
95 }
96
97 /// Process a batch in parallel via rayon.
98 ///
99 /// Tracks `received` stats automatically. Returns results in input order.
100 /// The caller handles the sequential phase (buffer push, DLQ, etc.).
101 pub fn process_batch<P: BatchProcessor>(
102 &self,
103 processor: &P,
104 batch: &[P::Input],
105 ) -> Vec<Result<P::Output, P::Error>> {
106 self.stats.add_received(batch.len() as u64);
107 self.pool
108 .process_batch(batch, |input| processor.process(input))
109 }
110
111 /// Access the underlying worker pool (for `fan_out_async`, scaling, etc.).
112 #[must_use]
113 pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
114 &self.pool
115 }
116
117 /// Access pipeline stats.
118 #[must_use]
119 pub fn stats(&self) -> &Arc<PipelineStats> {
120 &self.stats
121 }
122}