Skip to main content

hyperi_rustlib/worker/
batch.rs

1// Project:   hyperi-rustlib
2// File:      src/worker/batch.rs
3// Purpose:   BatchProcessor trait and BatchPipeline for parallel-then-sequential processing
4// Language:  Rust
5//
6// License:   FSL-1.1-ALv2
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Batch processing framework for DFE pipeline parallelisation.
10//!
11//! Provides the [`BatchProcessor`] trait for defining parallel-safe message
12//! processing, and [`BatchPipeline`] for orchestrating the parallel (rayon) →
13//! sequential (state mutation) pipeline.
14//!
15//! ## The Pattern
16//!
17//! Every DFE app follows the same structure:
18//!
19//! 1. **Parallel phase:** Process each message through a pure `&self` function
20//!    (parse, route, transform, enrich) -- via rayon `process_batch()`
21//! 2. **Sequential phase:** Apply results to mutable state (buffer push,
22//!    mark_pending, stats update, DLQ routing)
23//!
24//! The [`BatchProcessor`] trait captures phase 1. Phase 2 is app-specific
25//! (each app has different buffers, caches, and sinks).
26//!
27//! ## Example
28//!
29//! ```rust,ignore
30//! use hyperi_rustlib::worker::{BatchPipeline, BatchProcessor};
31//!
32//! struct MyProcessor<'a> { router: &'a Router, ... }
33//!
34//! impl BatchProcessor for MyProcessor<'_> {
35//!     type Input = KafkaMessage;
36//!     type Output = ProcessedMessage;
37//!     type Error = MyError;
38//!
39//!     fn process(&self, msg: &KafkaMessage) -> Result<ProcessedMessage, MyError> {
40//!         let parsed = sonic_rs::from_slice(&msg.payload)?;
41//!         let table = self.router.route(&parsed)?;
42//!         Ok(ProcessedMessage { table, data: parsed })
43//!     }
44//! }
45//!
46//! // In event loop:
47//! let processor = MyProcessor { router: &router, ... };
48//! let results = pipeline.process_batch(&processor, &batch);
49//! drop(processor); // release immutable borrows
50//! // Sequential phase: apply results to mutable state
51//! ```
52
53use std::sync::Arc;
54
55use super::pool::AdaptiveWorkerPool;
56use super::stats::PipelineStats;
57
58/// Trait for parallel-safe message processing.
59///
60/// Implement this with a struct that holds only `&` references to immutable
61/// dependencies. The `process` method must be pure -- no mutable state, no I/O,
62/// no `.await`. Safe for rayon `par_iter()`.
63///
64/// The struct is typically created per-batch in the event loop (borrows released
65/// before the sequential phase begins). The borrow checker enforces this.
66pub trait BatchProcessor: Sync {
67    /// Input message type (e.g. `KafkaMessage`, `HttpRequest`).
68    type Input: Sync;
69
70    /// Successful processing result (e.g. `ProcessedMessage`, `CompressedBatch`).
71    type Output: Send;
72
73    /// Error type for processing failures.
74    type Error: Send;
75
76    /// Process a single input. Must be pure -- no mutation, no I/O.
77    fn process(&self, input: &Self::Input) -> Result<Self::Output, Self::Error>;
78}
79
80/// Orchestrates parallel batch processing via [`AdaptiveWorkerPool`].
81///
82/// Wraps the worker pool with common DFE pipeline concerns: stats tracking,
83/// memory accounting, and metrics emission. Apps provide a [`BatchProcessor`]
84/// implementation; the pipeline handles the rest.
85pub struct BatchPipeline {
86    pool: Arc<AdaptiveWorkerPool>,
87    stats: Arc<PipelineStats>,
88}
89
90impl BatchPipeline {
91    /// Create a new batch pipeline.
92    #[must_use]
93    pub fn new(pool: Arc<AdaptiveWorkerPool>, stats: Arc<PipelineStats>) -> Self {
94        Self { pool, stats }
95    }
96
97    /// Process a batch in parallel via rayon.
98    ///
99    /// Tracks `received` stats automatically. Returns results in input order.
100    /// The caller handles the sequential phase (buffer push, DLQ, etc.).
101    pub fn process_batch<P: BatchProcessor>(
102        &self,
103        processor: &P,
104        batch: &[P::Input],
105    ) -> Vec<Result<P::Output, P::Error>> {
106        self.stats.add_received(batch.len() as u64);
107        self.pool
108            .process_batch(batch, |input| processor.process(input))
109    }
110
111    /// Access the underlying worker pool (for `fan_out_async`, scaling, etc.).
112    #[must_use]
113    pub fn pool(&self) -> &Arc<AdaptiveWorkerPool> {
114        &self.pool
115    }
116
117    /// Access pipeline stats.
118    #[must_use]
119    pub fn stats(&self) -> &Arc<PipelineStats> {
120        &self.stats
121    }
122}